在磚結構化流模式
這包含常見的筆記本和代碼示例模式處理結構化流磚。
開始使用結構化的流
如果你是全新的結構化流,明白了第一個結構化流負載運行。
寫卡珊德拉在Python作為結構化流水槽
Apache Cassandra是一個分布式的、低延遲、可伸縮的高度可用的OLTP數據庫。
結構化流與卡桑德拉通過火花卡桑德拉連接器。這個連接器支持抽樣和DataFrame api,它有原生支持編寫流數據。*重要*您必須使用相應的版本的spark-cassandra-connector-assembly。
下麵的示例連接到一個或多個主機在卡桑德拉數據庫集群。它還指定了連接配置如檢查點位置和具體用於和表名:
火花。相依。集(“spark.cassandra.connection.host”,“host1 host2”)df。writeStream\。格式(“org.apache.spark.sql.cassandra”)\。outputMode(“添加”)\。選項(“checkpointLocation”,“/道路/ /檢查站”)\。選項(“用於”,“keyspace_name”)\。選項(“表”,“table_name”)\。開始()
寫Azure突觸分析使用foreachBatch ()
在Python中
streamingDF.writeStream.foreachBatch ()
允許重用現有的一批作家寫的輸出數據流查詢Azure突觸分析。看到foreachBatch文檔獲取詳細信息。
要運行這個示例,您需要Azure突觸分析連接器。在Azure突觸分析連接器的詳細信息,請參見在Azure突觸分析查詢數據。
從pyspark.sql.functions進口*從pyspark.sql進口*defwriteToSQLWarehouse(df,epochId):df。寫\。格式(“com.databricks.spark.sqldw”)\。模式(“覆蓋”)\。選項(“url”," jdbc::狀態"置疑" / / < the-rest-of-the-connection-string >”)\。選項(“forward_spark_azure_storage_credentials”,“真正的”)\。選項(“數據表”,“my_table_in_dw_copy”)\。選項(“tempdir”,“wasbs: / / < your-container-name > @ < your-storage-account-name >.blob.core.windows.net/ < your-directory-name >”)\。保存()火花。相依。集(“spark.sql.shuffle.partitions”,“1”)查詢=(火花。readStream。格式(“速度”)。負載()。selectExpr(“值% 10鍵”)。groupBy(“關鍵”)。數()。toDF(“關鍵”,“數”)。writeStream。foreachBatch(writeToSQLWarehouse)。outputMode(“更新”)。開始())