在磚結構化流模式
這包含常見的筆記本和代碼示例模式處理結構化流磚。
開始使用結構化的流
如果你是全新的結構化流,明白了第一個結構化流負載運行。
寫卡珊德拉在Python作為結構化流水槽
Apache Cassandra是一個分布式的、低延遲、可伸縮的高度可用的OLTP數據庫。
結構化流與卡桑德拉通過火花卡桑德拉連接器。這個連接器支持抽樣和DataFrame api,它有原生支持編寫流數據。*重要*您必須使用相應的版本的spark-cassandra-connector-assembly。
下麵的示例連接到一個或多個主機在卡桑德拉數據庫集群。它還指定了連接配置如檢查點位置和具體用於和表名:
火花。相依。集(“spark.cassandra.connection.host”,“host1 host2”)df。writeStream\。格式(“org.apache.spark.sql.cassandra”)\。outputMode(“添加”)\。選項(“checkpointLocation”,“/道路/ /檢查站”)\。選項(“用於”,“keyspace_name”)\。選項(“表”,“table_name”)\。開始()
寫Azure突觸分析使用foreachBatch ()
在Python中
streamingDF.writeStream.foreachBatch ()
允許重用現有的一批作家寫的輸出數據流查詢Azure突觸分析。看到foreachBatch文檔獲取詳細信息。
要運行這個示例,您需要Azure突觸分析連接器。在Azure突觸分析連接器的詳細信息,請參見在Azure突觸分析查詢數據。
從pyspark.sql.functions進口*從pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df。寫\。格式(“com.databricks.spark.sqldw”)\。模式(“覆蓋”)\。選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\。選項(“forward_spark_azure_storage_credentials”,“真正的”)\。選項(“數據表”,“my_table_in_dw_copy”)\。選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\。保存()火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花。readStream。格式(“速度”)。負載()。selectExpr(“值% 10鍵”)。groupBy(“關鍵”)。數()。toDF(“關鍵”,“數”)。writeStream。foreachBatch(writeToSQLWarehouse)。outputMode(“更新”)。開始())
寫信給亞馬遜DynamoDB使用foreach ()
在Scala中,Python
streamingDF.writeStream.foreach ()
允許您編寫的輸出流查詢到任意位置。
使用Python
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()
在Python中給DynamoDB寫信。第一步得到DynamoDB寶途資源。這個例子是用寫的access_key
和secret_key
,但是磚建議你使用S3訪問配置實例配置文件。
定義一些輔助方法來創建DynamoDB表運行的例子。
table_name=“PythonForeachTest”defget_dynamodb():進口boto3access_key=“<訪問密鑰>”secret_key=“<秘密密鑰>”地區=“<地區名稱>”返回boto3。資源(“dynamodb”,aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=地區)defcreateTableIfNotExists():“‘創建一個DynamoDB表如果它不存在。這一定是火花驅動程序上運行,而不是在foreach。“‘dynamodb=get_dynamodb()existing_tables=dynamodb。元。客戶端。list_tables()(的表名]如果table_name不在existing_tables:打印(“創建表% s”%table_name)表=dynamodb。create_table(的表=table_name,KeySchema=({“AttributeName”:“關鍵”,“KeyType”:“希”}),AttributeDefinitions=({“AttributeName”:“關鍵”,“AttributeType”:“年代”}),ProvisionedThroughput={“ReadCapacityUnits”:5,“WriteCapacityUnits”:5})打印(“等待表做好準備”)表。元。客戶端。get_waiter(“table_exists”)。等待(的表=table_name)
定義的類和方法,寫入DynamoDB並調用它們
foreach
。有兩種方法來指定您的自定義邏輯foreach
。使用一個函數:這是最簡單的方法,可以用來寫一行。然而,客戶端/連接初始化寫一行將在每次調用完成。
defsendToDynamoDB_simple(行):“‘DynamoDB函數發送一行。當使用foreach,調用這個方法是遺囑執行人與生成的輸出行。“‘#創建客戶端對象的執行者,#不使用客戶端在驅動程序中創建的對象dynamodb=get_dynamodb()dynamodb。表(table_name)。put_item(項={“關鍵”:str(行(“關鍵”]),“數”:行(“數”]})
使用一個類
開放
,過程
,關閉
方法:這允許一個更高效的實現,客戶端/連接初始化,可以編寫多行。類SendToDynamoDB_ForeachWriter:“‘類發送一組行DynamoDB。當使用foreach,這個類的副本將被用來寫字多行執行人。看到“DataStreamWriter.foreach”的python文檔為更多的細節。“‘def開放(自我,partition_id,epoch_id):#這叫做第一次當準備發送多個行。#把所有的初始化代碼在open(),以便新鮮#複製這個類的初始化在開放的執行人()#將調用。自我。dynamodb=get_dynamodb()返回真正的def過程(自我,行):#這是要求每一行()被稱為後開放。#這個實現發送一次一行。#為進一步增強,接觸火花+ DynamoDB連接器#團隊:https://github.com/audienceproject/spark-dynamodb自我。dynamodb。表(table_name)。put_item(項={“關鍵”:str(行(“關鍵”]),“數”:行(“數”]})def關閉(自我,犯錯):#這叫做畢竟行處理。如果犯錯:提高犯錯
調用
foreach
在你流查詢使用上麵的函數或對象。從pyspark.sql.functions進口*火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花。readStream。格式(“速度”)。負載()。selectExpr(“值% 10鍵”)。groupBy(“關鍵”)。數()。toDF(“關鍵”,“數”)。writeStream。foreach(SendToDynamoDB_ForeachWriter())# .foreach (sendToDynamoDB_simple) / /選擇,使用一個或另一個。outputMode(“更新”)。開始())
使用Scala
這個例子展示了如何使用streamingDataFrame.writeStream.foreach ()
在Scala中給DynamoDB寫信。
這個你需要創建一個運行DynamoDB表都有一個單獨的字符串鍵命名為“價值”。
定義的實現
ForeachWriter
接口執行寫。進口org。apache。火花。sql。{ForeachWriter,行}進口com。amazonaws。AmazonServiceException進口com。amazonaws。身份驗證。_進口com。amazonaws。服務。dynamodbv2。AmazonDynamoDB進口com。amazonaws。服務。dynamodbv2。AmazonDynamoDBClientBuilder進口com。amazonaws。服務。dynamodbv2。模型。AttributeValue進口com。amazonaws。服務。dynamodbv2。模型。ResourceNotFoundException進口java。跑龍套。ArrayList進口scala。集合。JavaConverters。_類DynamoDbWriter擴展ForeachWriter(行]{私人瓦爾的表=“<表名稱>”私人瓦爾accessKey=“< aws訪問密鑰>”私人瓦爾secretKey=“< aws密鑰>”私人瓦爾regionName=“<地區>”/ /這個懶洋洋地將隻有當初始化()開放懶惰的瓦爾ddb=AmazonDynamoDBClientBuilder。標準()。withCredentials(新AWSStaticCredentialsProvider(新BasicAWSCredentials(accessKey,secretKey)))。withRegion(regionName)。構建()/ // /這叫做第一次當準備發送多個行。/ /把所有的初始化代碼內部開放的(),這樣一個新鮮的/ /複製這個類的初始化在開放的執行人()/ /將被調用。/ /def開放(partitionId:長,epochId:長)={ddb/ /客戶端的初始化真正的}/ // /這是要求每一行()被稱為後開放。/ /這個實現發送一次一行。/ /一個更有效的實現可以發送一次批次的行。/ /def過程(行:行)={瓦爾rowAsMap=行。getValuesMap(行。模式。字段名)瓦爾dynamoItem=rowAsMap。mapValues{v:任何= >新AttributeValue(v。toString)}。asJavaddb。putItem(的表,dynamoItem)}/ // /這叫做畢竟行處理。/ /def關閉(errorOrNull:Throwable)={ddb。關閉()}}
使用
DynamoDbWriter
寫速度流進DynamoDB。火花。readStream。格式(“速度”)。負載()。選擇(“價值”)。writeStream。foreach(新DynamoDbWriter)。開始()
亞馬遜CloudTrail ETL
以下筆記本顯示您可以很容易地改變你的亞馬遜CloudTrail日誌從JSON拚花特別的高效查詢。看到實時流ETL和結構化流獲取詳細信息。