取消
顯示的結果
而不是尋找
你的意思是:

無法編寫csv文件使用熊貓to_csv Azure BLOB ()

halfwind22
新的貢獻者三世

我使用Py函數讀取一些數據從一個端點,寫成一個CSV文件到Azure BLOB位置。

我得到端點需要2查詢參數,param1 param2。所以一開始,我有一個dataframe paramDf兩列param1和param2。

param1 param2 12 25 45 95模式:paramDF: pyspark.sql.dataframe。DataFrame param1:字符串param2:字符串

我現在寫一個函數如下:

def executeRestApi (w): dlist =[]嚐試:響應= requests.get (DataUrl。形式at(token=TOKEN, oid=w.param1,wid=w.param2)) if(response.status_code==200): metrics=response.json()['data']['metrics'] dic={} dic['metric1'] = metrics['metric1'] dic['metric2'] = metrics['metric2'] dlist.append(dic) pandas.DataFrame(dlist).to_csv("../../dbfs/mnt/raw/Important/MetricData/listofmetrics_{}_{}.csv".format(param1,param2),header=True,index=False) return "Success" except Exception as e: return "Failure"

最後,調用方法:

paramDf.foreach (executeRestApi)

因此,從理論上講,函數executeRestApi必須敵人dataframe中的每一行執行,在函數內,我提取所需的數據並將它寫入ADLS位置為csv文件。

所有作品好,除了文件從來沒有當我寫一個多節點集群上執行foreach命令。

然而同樣的操作單個節點集群上運行良好。我無法找出這兩種方法之間的區別。

可能我做錯了什麼呢?

1接受解決方案

接受的解決方案

werners1
尊敬的貢獻者三世

好讓我們看看。

你有火花dataframe,分布式的本性。

另一方麵你使用熊貓,不分配。

你是純python函數,不是pyspark。

這將是由驅動程序處理,所以沒有分布。

然而,dataframe本身將由工人進行處理。

所以工人們想做些什麼但是代碼運行在司機。它可能是。不確定,但事實上它是單節點讓我認為你的代碼並不是在分布式環境中執行。

這就是為什麼我提到了收集()。這將收集的火花dataframe司機的工人們繞過你的案子。

從python pyspark花一些時間來理解。

這個博客解釋一些有趣的主題:

https://medium.com/hashmapinc/5-steps-to-converting-python-jobs-to-pyspark-4b9988ad027a

使用考拉代替熊貓初版

在原帖子查看解決方案

11日回複11

Kaniz
社區經理
社區經理

你好@halfwind22!我的名字叫Kaniz,我這裏的技術主持人。很高興認識你,謝謝你的問題!看看你的同行在社區中有一個回答你的問題。否則我將盡快給你回電。謝謝。

halfwind22
新的貢獻者三世

@Kaniz Fatma謝謝,急切地等待你的回應。

嗨@Aravind NK,謝謝你的信任我們:grinning_face:

werners1
尊敬的貢獻者三世

刪除從foreach寫。而不是建立一個dataframe。返回dataframe和隻寫一次。

現在你寫在每個迭代中。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map