我跑步磚10.4版本的豐富。我運行一個結構化流試圖處理曆史文件δgcp雲存儲表。這個源三角洲表是大但維護與優化。
流重新分區,似乎是大問題。
我用.trigger (availableNow = True)。洗牌寫好巨大但查詢似乎繼續直到寫的最後階段,即199/200完成。
改變spark.sql.shuffle.partitions或maxFilesPerTrigger似乎沒有影響處理。
這是階段:
org.apache.spark.sql.streaming.DataStreamWriter.toTable sun.reflect.NativeMethodAccessorImpl (DataStreamWriter.scala: 361)。在voke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) py4j.Gateway.invoke(Gateway.java:295) py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) py4j.commands.CallCommand.execute(CallCommand.java:79) py4j.GatewayConnection.run(GatewayConnection.java:251) java.lang.Thread.run(Thread.java:748)
和流配置:
.repartition(“日期”、“id”) \ .writeStream \ .trigger (availableNow = True) \ .option (checkpointLocation, f”{checkpoint_basepath} {dest_database} -{模型}{full_source_table.replace (“。”,“-”)}”) \ .format(δ)\ .queryName(模型)\ .outputMode(附加)\ .option (“mergeSchema”,“真正的”)\ .partitionBy(“日期”、“id”) \ .toTable (table_name)
謝謝你的回應。你見過maxBytesPerTrigger使用最新版本嗎?
你可能會存在數據傾斜,因為你正在做重新分配(“日期”、“id”)。你有更多的數據了幾天嗎?或id嗎?你試著通過移除這個重新分配()步驟?