使用結構化流實現大規模湖屋

2021年5月27日下午05:00(太平洋時間)

下載幻燈片

業務主管、高管、分析師和數據科學家依靠最新的信息來做出業務決策、適應市場、滿足客戶的需求或運行有效的供應鏈操作。beplay体育app下载地址

來聽聽Asurion如何使用Delta、結構化流媒體、自動加載器和SQL分析來提高生產數據延遲,從日- 1到接近實時Asurion的技術團隊將分享戰鬥測試的技巧和技巧,你隻有在一定的規模。Asurion數據湖執行4000多個流作業,並在AWS上的生產數據湖中托管超過4000個表。

在此會話中注意:
Tomasz Magdanski, Asurion工程總監

成績單

托馬斯Magdansk…:你好。我要告訴你的,你不會在任何書籍或博客上找到。我將與您分享構建大規模數據湖時會遇到的實際障礙。我將簡要介紹一下阿蘇裏翁。我們是怎麼走到這一步的?如何經營可規模性和高性價比的湖屋?最重要的是,我們一路走來學到的教訓。
我們是一家大型保險和支持公司。每天,我們有一萬多名專家參與支持會議,幫助全球超過三億人,我們的服務範圍從技術支持到當天設備維修和更換。為了讓您了解這個項目的規模,我們的湖屋從100多個數據庫中吸收了4000多個表。我們在datalake中創建了7500個表,其中隻有兩個表,我稍後會講到。我們從Kafka、Kinesis、SNS和SQS中獲取流數據,以及從api中獲取文件、平麵文件和數據。
我們甚至從其他雲提供商那裏獲取數據。我們的數據庫來源從SQL server, Oracle, PostGreSQL, MySQL動態開始Redshift。在我們的數據倉庫中,我們將數千個這樣的表組合在一起,生成300多個數據模型和600多個數據集市。最後,在我們的消費層,我們有超過10,000個數據視圖和超過2000個報告。我們以前的體係結構基於Lambda體係結構。您可能知道,Lambda體係結構包含速度層和批處理層,這存在幾個問題。首先,您必須對所有內容進行兩次處理。您還必須對數據進行兩次驗證,並且常常需要使用不同的技術來實現此目的。或者必須以不同的方式處理後期數據,並且必須擔心在大部分不可變空間中重新處理數據。你必須擔心調度,重寫和查詢。 Data updates are also really difficult, which makes the data compliance more difficult.
我們沒有真正的計算機存儲分離,這使得可擴展性變得困難和昂貴。我們的數據延遲主要是D-1,我們也擁有非常廣泛的技術堆棧。從Redshift到Lambda, Kinesis, Kinesis Firehose, EMR, RDSS, Hive, Spectrum。這很難管理。然後我們著眼於隻有一條管道的湖屋建築。我們有接近實時的數據延遲能力,Apache Spark的可伸縮性,高度集成的生態係統,而且技術堆棧真的非常狹窄。這是非常非常有希望的。
另一個有助於開發的重要進步是直接利用生產數據。我們的目標是盡可能減少數據移動,並利用計算機存儲分離。開發數據平台,訪問真實的生產數據Beplay体育安卓版本來識別和解決許多微妙的問題,以及處理實際規模,這些都是我們在開發環境中無法再現的。所以這個結果對我們來說是非常理想的。我們使用IAM角色和掛載點以隻讀模式透明地連接預產品計算集群和生產數據,我們也將數據寫回生產數據包。因此,數據從來沒有真正駐留在長期環境中,在真正的計算和存儲分離部分。這張圖很重要,我們會在課程中回到這一點。
在我們之前的體係結構中,每個表有一個ETL映射作業,這使得平台非常嚴格。Beplay体育安卓版本我們實際上有4000個映射,如果你需要在多個映射中進行更改,這需要大量的工作,而且平台因為必須更改的事情的複雜性而拒絕更改。Beplay体育安卓版本在Lake House中,我們想要創建一個單一的火花攝入作業,它能夠進行流處理、批處理和從所有源讀取,並且是完全可配置的,我們已經在Scala中使用設計模式和大量依賴注入編寫了這個作業,允許非常豐富的可配置性。我們選擇了結構化流來利用檢查點、確切的語義或至少一種語義,並且我們將接口和著陸區域統一到S3和Kafka。因此,數據庫、CDC管道、api、平麵文件都上傳到S3, SNS、SQS和Kinesis都上傳到Kafka。然後,我們使用Databricks作為股作業和臨時集群來調度攝入作業。因此,這使得代碼具有高度可測試性、易於維護和非常靈活的特性。
數據湖中的所有表都是增量表。作為數據湖的一部分,我們需要保持一個緩慢變化的維度類型附加表來跟蹤所有行、所有列和分數的變化,我們稱之為L1表,但我們也需要存儲每一行的相關版本,像搜索引擎[聽不清]類型通過合並到目標表的變化,我們稱之為L2表。所以我們有一個選擇。你是否將數據從降落區流到L1,然後從L1流到L2?但這實際上會使我們的工作崗位翻倍,從4000個增至8000個。因此,經過大量測試後,我們決定利用每個批處理API的強大功能,同時編寫兩個表。如果你……智力箱大大簡化了,但我們實際上並不使用裸Spark。我們的讀者和作者都圍繞著Spark展開。我們還通過在鍛造批處理中添加強製元列來裝飾我們的數據。
我們必須做出的下一個選擇是圍繞我們的觸發選擇而解決這個問題的一種原生方法是取出攝取作業,這是一個流作業並將其作為一個短暫的作業發送。現在的挑戰是,我們有4000個表要運行,而Databricks隻允許在碎片中有1000個作業。最重要的是,每個短暫的集群都必須有一個驅動程序和至少兩個節點,這將使我們處於12000個節點的環境中,成本有點高,我們覺得這不是對資源的最佳利用。所以下一步我們著眼於將這些流作業組合在筆記本中,並將它們作為一個短暫的作業運行,我們找到了一個驅動程序處理大約40個流的最佳點。
當然,集群必須更大,才能處理所有的流,但我們也注意到計算機浪費的程度相當高,因為在這40個流中,可能有些流並不經常有數據,但我們讓它們一直運行。這種方法的另一個問題是,如果我們想要禁用單個流,我們必須有效地停止作業,這將停止所有40個流。所以我們想要找到一種更好的方法,能夠切換一個作業,或將它從一個筆記本電腦移動到另一個筆記本電腦,或從一個集群移動到另一個集群,而不需要停止所有其他作業。所以最後,我們確定了一個觸發器選項,對於那些可能不知道觸發器選項的人,它是結構化流,將收集所有數據,直到你的最大流或文件或字節數,處理數據,然後檢查進度,然後終止作業。
因此,當您希望微批運行時,您需要真正負責調度它們。這意味著沒有持續的執行,這允許我們將筆記本中的數百個作業放在一個臨時集群中。現在,因為這些作業是按計劃運行的,我們實際上可以在兩輪之間將它們遷移到筆記本電腦之間。我們還可以在每一輪中刷新每一輪的衝突,所以我們所做的唯一配置更改都是立即生效的,無需重啟。同時,我們也在使用機器學習在不同的集群中重新激活工作,以確保我們滿足我們的數據sli。我們有五種類型的筆記本作為臨時作業運行,這些筆記本有一個分配給它們的組。它們進入數據庫,配置數據庫,收集所有屬於該組的作業,然後它們以[聽不清]的順序運行這些作業,或者通過使用功率集合,我們可以麻痹作業的執行,直到驅動程序上的數量,通常是16到32個兩個作業同時運行。
那種筆記本我們有五種口味。我們有一個用於經常更新的東西,我們實際上可以在每一個筆記本上放置大約60個表格。我們的目標是在60分鍾內完成所有的更新和合並。我們有一些更新頻率較低的表,我們可以放置300到500個表。然後是不經常更新的表,我們可以放1000個表。這種方法的好處是,我們可以使用一個頻率不斷增長的表,並在這些組之間無縫地移動它,無縫地重新平衡這些作業,而不會對該組中運行的任何其他作業造成資源或影響。
我們還可以通過一個筆記本來實現所謂的偽流,它以y循環的方式運行作業,我們已經測試過了,它的性能非常接近於運行一個處理觸發器,讓Spark處理微批的執行。除了我們可以在每個y循環的頂部檢查配置之外,這意味著如果我們有五個作業正在運行,我們想要禁用其中一個,我們隻禁用其中一個,下一個迭代將不會運行,其他四個不會重新啟動。
好的。所以,讓我們繼續談談我們在建造如此大規模的湖泊環境時學到的經驗教訓。首先,讓我們談談雲文件。對於那些不知道的人,雲文件是自動加載器的一部分,由Databricks提供,當你使用AWS並試圖從一個文件夾中讀取時,Databricks會自動為該文件夾創建一個S3通知,這將使其成為一個SNS,然後它會有一個SNS訂閱了訂閱了SNS的SQS。首先,AWS隻允許每個桶有100個通知。因此,您將不得不編寫某種自動化程序,以知道在後端通知已經飽和,並且可能需要在下一個bucket中部署一種新的作業類型。
其次,SQS和SNS在默認情況下沒有標記,至少目前是這樣。因此,如果你像我們一樣,或者要求所有的資源都被標記,你就必須使用某種Lambda或某種函數來注意和檢測Databricks在你的帳戶上創建的資源,並適當地標記它們。最後,AWS SNS對api有嚴格的限製,ListSubscriptions和ListSubscriptionsByTopic, Databricks使用這些api來檢查SNS是否已經有SQS訂閱。如果你運行足夠多的工作,就像我們同時運行成千上萬的工作,我們就會看到我們達到這些極限的時候,我們的工作就會失敗。所以今天唯一的選擇就是看看是否有非常慢的表沒有太多的變化,我們可以禁用通知,或者將通知及時分散以避免這種情況。但總有一天,我們會達到一定的規模我們會遇到這些問題。
好的。這是我們從雲文件中學到的另一個教訓。我要回到我之前講過的數據計算隔離幻燈片。因此,當你從另一個帳戶預先進行計算,從生產帳戶請求數據時,Databricks將設置一個通知,它將設置SNS,然後它將設置SQS。但是如果您注意到,SNS和SQS位於計算的帳戶中,而不是存儲的帳戶中。
所以這是可行的,您的測試也是可行的,很好,您已經準備好部署到生產環境了。將作業部署到生產環境中。您對數據發出相同的請求,這次Databricks隻創建了一個SQS隊列,因為SNS已經存在,對嗎?所以現在,是的,你的生產數據也可以工作,但通知和整個雲文件運行在一個預推送環境中,這當然是有問題的。因此,在部署過程中我們必須做的是清除通知,首先運行生產作業,讓生產作業設置SNS和SQS,然後啟動預生產跳轉以訂閱該SNS主題。這是最後一步,但這是我們在部署過程中必須完成的額外步驟。
好的,下一個。我們通過AWS DMS從成千上萬的數據庫表中獲取數據,並使用CDC更改數據捕獲流。當你第一次嚐試在表上啟用CDC時,你必須做兩件事。你必須啟用加載和CDC。load的意思是,它是給定時間表的快照,CDC是在每個角色更改之後跳過的數據庫記錄的過程。現在,我們發現這個設置的挑戰是加載文件可能需要幾個小時,CDC文件在你開始工作時就開始被跟蹤。所以有可能在管道的CDC部分有一個對象會有一個時間戳那是在加載文件完成之前,而加載文件分配了現在的時間戳。
所以你有一個加載文件版本的行它有一個時間戳在CDC更新版本的行之前。因此,我們將加載文件的DMS時間戳重置為零,以避免這種競態條件。我們從DMS和CDC中學到的另一個教訓是數據類型轉換,因為DMS幾乎可以連接到任何數據庫,所以我們發現有時它不能正確地轉換數據類型。舉個例子,我們是一個SQL服務器,一個很小的Int轉換成UINT我們已經看到了一些溢出我們必須應用你們在幻燈片底部看到的規則來強製它返回一個整數。例如,在Oracle中,數值被轉換為DECIMAL(38,10),您可以選擇將其一直設置為38,38,但例如,在Oracle數據庫中,數值列的精度為50。所以我們沒有辦法帶來數據。我們必須將名為numberDataTypeScale的設置設置為- 2,這將有效地將其轉換為字符串。
另一個教訓。加載文件可能很大,在讀取時可能會扭曲數據。所以你可能需要加一些鹽。DMS文件不是分區的,所以要麼考慮壓縮,要麼就意識到Spark讀取DMS桶會花費很長時間。如果幾個月後重新啟動,會有成千上萬的小文件。我們設置DMS在我們開始任務時刪除所有文件,隻是為了有一個幹淨的記錄,隻是試圖緩解這個和這個問題,並最小化數據重複。數據庫源上的一些源可能具有大型事務。
讓我們回顧一下。當我們在一個微批處理中對單行進行多個更新時,我們必須確定哪一個被延遲了。這個說我們想要合並到目標表中,最初我們認為我們可以使用時間戳因為它們通常以毫秒或微秒為單位,但我們發現如果你手動在數據庫中打開一個大型事務,你把很多對同一行的更新放到事務中,然後關閉事務,所有這些更新都有完全相同的時間戳。所以你不可能確定哪個是最新的。所以我們必須通過DMS為所有數據庫引入LSN以確保我們有某種確定的方法來合並並得到最新的行。
現在,我們學到的另一個教訓是如果數據庫沒有主鍵,但有一個唯一的約束你可以用來合並,這個約束包含空值,空值是很多的源,Databrick的合並Delta在匹配時不會識別空值和空值,它每次都會插入新行。所以我們要做的就是把所有的null替換成字符串null或者空字符串來進行更確定的歸並。
卡夫卡的教訓。所以如果你用Kafka做CDC管道模式,這也是可能的,你可能有一些沒有太多流量的表,你不想配置大量的分區浪費大量的資源在Kafka上為小表,但初始數據加載實際上可能會帶來數百萬行到這個主題。現在,您有一個沒有大量分區的主題,但它有很多數據,您不必為了一次讀取而重新分區該主題。因此,我們建議將每個觸發器的最小分區和最大偏移量設置為較高的值。在我們的例子中,我認為我們分區4,000並偏移10,000,以迫使Spark麻痹這個主題的權重,從而在不需要重新分區的情況下加速它。
我們也使用第一個L1表。我們為每個批處理優化內部的表,然後使用它作為L2合並的源。那就是避免一個動作,第二個動作,回到源頭,因為源頭可能很慢。DMS可能會因為這麼多文件而停止,或者Kafka在這種情況下也可能會變慢。所以一般來說,我們采用的模式是在數據幀中添加一個批處理ID列,寫入L1,在需要時進行優化,然後使用相同的批處理ID從L1中篩選數據,以便合並。我們發現這種模式比回到源代碼快得多,如果你有大量的數據,緩存也會慢得多。
我們從卡夫卡身上學到的其他東西。好吧,我們想在所有東西上使用一次觸發器,這樣我們就有了選擇,而運動和其他來源不支持這個開箱即開。所以我們必須使用Kafka Connect將我們所有的SNS, SQS, Kinesis移動到Kafka中,然後我們可以使用支持,我們可以使用Kafka觸發器一次使這個工作完全相同的方式,例如,在DMS中。
從德爾塔吸取的教訓。當你第一次帶數據時,手動優化你的表因為你會得到更快的[聽不清]然後啟用Delta優化寫因為合並會重寫很多數據。優化寫是很好的合並和計算文件大小,所以你不需要經常做壓縮優化。將批處理ID和合並列移到數據框架的前麵。
因此Databrick收集數據幀的第一行數據。如果你有一個非常寬的數據框架,你可能不會得到最後一列的統計數據,特別是如果你隻添加了批處理ID,那將是最後一列。例如,您需要這個批處理ID進行篩選。把所有你要合並和搜索的東西移到數據框架的前麵。如果你使用增量合並列,比如自動更新數字,等等,你也可以使用排序來共同定位數據進一步減少需要讀取的文件數量它有數據跳過。我們還總是建議使用分區和使用帶IO緩存的i3實例類型。
所以我們吸取了其他教訓。如果你有其他需要讀取Delta的工具,比如我們使用的Presto,當你在Hive中注冊Delta表時,你必須寫入S3路徑。因為Presto不會理解DBFS,它不會理解[聽不清],所以我們必須手動創建表定義並將S3路徑放入其中。如果你想讓你的Delta文件被Athena、Hive或者Spectrum這樣的東西寫入和讀取,你需要生成清單文件,並對這些清單文件啟用自動更新。這樣,非delta或非delta事務,本地[聽不清]技術仍然可以顯示文件來讀取parquet文件。
Presto和Spark視圖目前不兼容。這是需要注意的,因為如果你在Spark中創建一個視圖,Presto將不能使用它,反之亦然。我們還發現,提取諸如行數、最後修改到熱緩存中的Delta統計信息實際上是非常可取的,因為我們的工作負載需要更新許多以前的表,需要首先更新許多依賴的表。因此,假設您有任何需要首先更新30或40個表的TL。擁有一個正在運行的集群並發出40個描述的命令來判斷是否可以運行作業是比較慢的。因此,通過在每個表的末尾提交這些統計數據到熱緩存,下一個作業隻需讀取現金,並確定所有表都已更新,然後就可以開始執行作業了。
好的。最後,Delta和Spark一起將Delta流出Delta表,但目前,到今天為止,它隻適用於追加。這是有意義的,因為當您向Delta表追加內容時,您將向表和事務日誌中插入新文件,偵聽該表的任何人都知道有新文件,並且將讀取這些新文件。這很有道理。至於歸並,這有點複雜因為歸並會重寫所有數據。是的,創建了一個新文件該文件可能有100萬行,但我們隻更新了該文件中的1000行。如果你把文件放到管道中,你必須從百萬行中過濾到實際發生變化的一千行。我們是怎麼做到的呢?
同樣,我們使用之前添加的批處理ID來知道新批處理是什麼,27,我隻需要獲取27的數據,因此將這個大文件過濾為一個小文件。它不是效率最高的,但運行得很好。最後,SQL分析,簡單介紹一下我們如何使用SQL分析。因此,我們的數據集市是從1000多個SQL查詢和語句的集合中構建的。我們需要一種方法將數據集市從以前的實驗室平台提升並轉移到這個平台,因此我們需要一個好的可伸縮的SQL執行引擎。Beplay体育安卓版本當然,Spark就是這樣,我們希望利用現有的框架,通過JDBC連接器將SQL語句提交到Spark集群。第一種選擇是使用交互式集群,但它們相當昂貴。
所以我們更傾向於使用開源Spark和Delta的EMR。當SQL Analytics產品進入我們的範圍時,它仍然支持JDBC連接,所以它非常適合我們發送那些SQL查詢並創建那些數據集市。到目前為止,由於這是一個早期的產品,我們學到的一些經驗是我們必須自己從api中收集所有的指標,並將它們放在Delta表中進行一些監視和性能。每個SQL工作區隻允許使用一個元存儲,這意味著如果您有多個不同的SQL端點,它們都將共享一個元存儲,這與集群不同,我們可以在每個集群中配置不同的元存儲。所以我們在分離計算和存儲以及跨賬戶和使用元存儲將兩者結合在一起方麵受到了一些限製。
它也不支持UDF。所以如果你需要一個[聽不清],你仍然需要依靠交互式的Spark SQL集群,在那裏你可以附加jar,這次它將不需要jar用於SQL分析。最後,您還需要學習如何排除Spark的故障。所以你仍然需要學習理解dag和Spark job以及Spark UI上的SQL視圖,因為這仍然隻是一個Spark job的底層。因此,為了有效地發現查詢中的瓶頸,您仍然需要這樣做。
好的,非常感謝。現在是問答時間。謝謝你,請提供你的反饋。我們想要得到回複。我們想要提高與你們分享的內容的質量,如果你們有任何問題,如果你們想在線下聯係我,請隨時在LinkedIn上聯係我。謝謝你!

托馬斯Magdanski

Tomasz是一位經驗豐富的技術領導者,專門從事大數據、實時應用和機器學習技術的現實實現。他部署和管理生產應用程序…
閱讀更多

Baidu
map