跳轉到主要內容
工程的博客

創建一個IP查找表的活動在一個SIEM架構

分享這篇文章

在處理網絡安全數據時,有一件事是肯定的:沒有可用數據來源的短缺。如果有的話,有太多的數據源與重疊的數據。傳統SIEM(安全信息和事件管理)工具並不適合處理複雜的轉換和縫合多個數據源的一個有效方式。除此之外,是不劃算的過程tb和pb級的事件數據每天SIEM係統。

一個常見的網絡安全用例要求娶多個數據源的歸因是一個IP地址到一個特定的用戶。當使用傳統SIEM工具,以確定網絡上的惡意或可疑活動,SOC(安全操作中心)分析師推出多個查詢不同的數據源(VPN, DHCP等等)和手工縫合在一起找出一個IP地址的時間表和行動。這可能需要15分鍾/ IP地址。寶貴的時間在發生安全事故,可以拯救你的自動聚合和管理數據在著陸之前SIEM解決方案。

在這個博客中,我們將介紹一個簡單的數據收集方法,結合多個數據源創建IP查找表和自動化。這個表是一個威脅情報的基本構建塊創建一個整體的照片在你的網絡活動。它將使您查詢IP地址在一個給定的時間窗口,屬性,用戶/ MAC地址和跟蹤發生的事件的順序。

我們將使用思科伊勢姿態和分析器事件作為off-VPN VPN日誌和Infoblox DHCP日誌活動。我們使用的是磚實驗室數據生成器來模擬這些事件日誌和把他們變成一個S3 bucket。在真實的場景中,您可以使用一個實時流媒體服務等運動消防帶推動這種從本地服務器日誌到S3。這些日誌然後攝取使用磚Lakehouse平台Beplay体育安卓版本。我們已經建立了整個端到端管道使用三角洲生活表,它允許我們在每個階段確保數據質量的增量數據管理,以及端到端管道血統。一旦數據策劃成主IP查找表,如Splunk (SIEM工具使用磚Splunk附加)或BI工具或儀表板表等PowerBI或檢查員可以用作表示層。

網絡安全lakehouse架構

圖1展示了一個典型的網絡安全生態係統。這是一個糾纏的不同數據源和係統網絡SIEM工具組合。SOC分析師必須查詢不同來源和結果縫合到一起得到有意義的見解,變得更加複雜,增加大量的事件數據。平均梳理企業將pb級的數據,沒有合適的工具來處理這些龐大的數據集,這個任務可能很乏味,如果不是不可能的。

一個典型的網絡安全生態係統,源授權Splunk和其他與磚Lakehouse siem網絡安全)
)

我們提出一個替代網絡安全lakehouse架構,如圖2所示。實現這個架構的關鍵步驟是:

  • 土地數據從所有來源到廉價的對象存儲
  • 轉變成為一個高度優化大數據平台:過程和牧師和針數據使用三角洲湖Beplay体育安卓版本
  • 隻移動策劃數據一旦準備好被消耗到SIEM解決方案

其他優勢,Lakehouse架構提供了以下好處:

  • 打破豎井:數據對每個人都可用在同一平台上,每個數據角色可以使用自己喜歡的工具(筆記本、DBSQL等)在三角洲湖訪問單個源的Beplay体育安卓版本真理。此外,他們可以很容易地檢查代碼,對程序和協作在同一平台無需導出代碼或數據在其他地方。Beplay体育安卓版本
  • 結合批處理和流媒體:批處理和使用三角洲湖流媒體查詢都是幾乎相同的。你可以建立你的管道,讓他們不會過時,以防將來處理方式的變化。
  • 質量保證:你可以設置數據質量約束在不同階段的處理管道使用預期。這將允許你選擇如何應對意料之外的數據。
磚網絡安全Lakehouse架構
圖2:網絡安全Lakehouse架構

構建網絡安全數據管道與達美住表

從三角洲住表文檔:

三角洲生活表是一個框架為構建可靠、可維護、可測試的數據處理管道。你定義轉換執行數據,和δ生活表管理任務編製、集群管理、監控、數據質量和錯誤處理。

下圖顯示了端到端管道創建IP VPN和DHCP日誌查找表。我們選擇使用三角洲生活表(DLT)建立管道,因為它的簡單,它所提供的數據質量保證措施和跟蹤整個管道的血統。與DLT,您可以輕鬆地構建ETL管道使用Python或SQL。下麵顯示了ETL的DAG管道我們建立了創建一個IP查找表。

與達美住表,您可以很容易地建立你的ETL管道使用Python或SQL。
圖1:三角洲生活表DAG

代碼走查

你可以找到的筆記本電腦在這裏。正如圖1中所示,我們的土地每個數據源生(青銅)層。然後我們原始日誌使用適當的解析器解析成一個銀層。最後,我們得到感興趣的列(IpAddress MacAddress、用戶名等)從解析表,組合成一個策劃,準備使用IP查找表(黃金)。

思科伊勢分析器解析事件,他們在半鍵值的風格,我們可以使用PySpark的from_json方法。此外,我們已經從元數據提取事件時間戳(頭)相關的日誌。我們使用類似的方法來解析思科伊勢姿態事件。

@dltdef ise_profiler_silver ():返回(dlt.read_stream (“ise_profiler_bronze”).withColumn(“價值”,F.concat (F.lit(”,元數據= "),F.col(“價值”))).withColumn(“解析”,parse_string_as_json (F.col(“價值”))).withColumn(“數據”,F.from_json (F.col(“解析”),profiler_schema))選擇(F.col(數據。*)).withColumn (“MetadataTimestamp”,F.concat_ws (”“F.array ([F.split (F.col(“元數據”)," ")(我)(8,9]])),))

我們使用正則表達式映射(PySpark regexp_extract方法)來解析DHCP日誌。每個消息類型(DHCPACK, DHCPREQUEST等)解析使用其預期的正則表達式模式。我們選擇解析DHCP消息類型的ACK, NACK,報價,要求,釋放,下降,為簡單起見到期。

def parse_dhcp_message_with_regex (source_table_name=“dhcp_bronze”):返回(dlt.read_stream (source_table_name)選擇(“價值”,F。REGEX_DHCPOFFER regexp_extract(“價值”,0).alias (“EventType”))在哪裏(F.col (“EventType”) .isin ((“DHCPREQUEST”,“DHCPACK”,“DHCPNAK”,“DHCPOFFER”,“DHCPRELEASE”,“DHCPDECLINE”,“DHCPEXPIRE”,]))#解析不同類型消息不同的正則表達式模式IP地址.withColumn (“IpAddress”,F.when (F.col (EventType) .isin ((“DHCPREQUEST”)),F。regexp_extract(“價值”,“(?
              
              為了確保解析表的數據質量,我們對傳入的數據設定預期。@dlt表達的。expect_all_or_drop,我們預計EventTimestamp、IpAddress MacAddress沒有任何缺失的值。
              
              < pre > valid_dhcp_parsed_record = {:“valid_timestamp EventTimestamp”,:“valid_IpAddress IpAddress”,:“valid_MacAddress MacAddress”,}
              
              @dlt.table@dlt.expect_all_or_drop (valid_dhcp_parsed_record)def dhcp_silver ():返回parse_dhcp_message_with_regex ()

這是解析前DHCP記錄是什麼樣子:

7月13 09:08:00 12.99.46.134了dhcpd [3761]: DHCPACK, 12.88.0.15 (f0:5e: 0 c: b2:35: bf)通過eth2

這是什麼模式的解析三角洲表看起來像:

EventTimestamp:字符串IpAddress:字符串MacAddress:字符串EventType:字符串

最後,解析並結合三個數據源之後,我們獲得知識產權活動和他們的時間表,如圖3所示:

一個IP查找表由DHCP和VPN日誌
圖3:黃金IP查找表

下一個步驟

在這篇文章中,我們介紹的步驟構建一個簡單的知識產權歸屬表。這個表是第一個構建塊構建一個整體活動的照片在你的網絡威脅檢測和事件反應。一個明智的下一步是在數據源和表添加更多的列。

當你擁有越來越多的IP地址信息在你的表,你可以提取“正常”的行為模式對於IP網絡中使用機器學習算法。然後你可以檢測異常並標記IP如果外麵表現正常的界限。

您可以運行附帶的筆記本通過遵循下麵的鏈接直接發布:

更多的網絡安全內容,查看建立一個網絡安全Lakehouse CrowdStrike獵鷹事件通過DNS分析檢測罪犯和國家博客。

免費試著磚
看到所有工程的博客的帖子
Baidu
map