我使用Py函數讀取一些數據從一個端點,寫成一個CSV文件到Azure BLOB位置。
我得到端點需要2查詢參數,param1 param2。所以一開始,我有一個dataframe paramDf兩列param1和param2。
param1 param2 12 25 45 95模式:paramDF: pyspark.sql.dataframe。DataFrame param1:字符串param2:字符串
我現在寫一個函數如下:
def executeRestApi (w): dlist =[]嚐試:響應= requests.get (DataUrl。形式at(token=TOKEN, oid=w.param1,wid=w.param2)) if(response.status_code==200): metrics=response.json()['data']['metrics'] dic={} dic['metric1'] = metrics['metric1'] dic['metric2'] = metrics['metric2'] dlist.append(dic) pandas.DataFrame(dlist).to_csv("../../dbfs/mnt/raw/Important/MetricData/listofmetrics_{}_{}.csv".format(param1,param2),header=True,index=False) return "Success" except Exception as e: return "Failure"
最後,調用方法:
paramDf.foreach (executeRestApi)
因此,從理論上講,函數executeRestApi必須敵人dataframe中的每一行執行,在函數內,我提取所需的數據並將它寫入ADLS位置為csv文件。
所有作品好,除了文件從來沒有當我寫一個多節點集群上執行foreach命令。
然而同樣的操作單個節點集群上運行良好。我無法找出這兩種方法之間的區別。
可能我做錯了什麼呢?
好讓我們看看。
你有火花dataframe,分布式的本性。
另一方麵你使用熊貓,不分配。
你是純python函數,不是pyspark。
這將是由驅動程序處理,所以沒有分布。
然而,dataframe本身將由工人進行處理。
所以工人們想做些什麼但是代碼運行在司機。它可能是。不確定,但事實上它是單節點讓我認為你的代碼並不是在分布式環境中執行。
這就是為什麼我提到了收集()。這將收集的火花dataframe司機的工人們繞過你的案子。
從python pyspark花一些時間來理解。
這個博客解釋一些有趣的主題:
https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a
使用考拉代替熊貓初版