取消
顯示的結果
而不是尋找
你的意思是:

Protobuf反序列化的磚

sourander
新的貢獻者三世

你好,

假設我有這些東西:

  • 二進製列包含protobuf-serialized數據
  • .proto文件包括消息定義

不同的方法有磚用戶選擇反序列化數據?Python是我主要熟悉的編程語言,因此,任何可以通過使用pyspark就好了。

在本地,我的方法是使用grpc_tool。protoc生成pb2文件(消息。原型- > message_pb2.python)。我後來在進口這些類並使用approariate消息反序列化的二進製數據。示例代碼如下:

進口操作係統從grpc_tools進口進口pkg_resources protoc #詳細打印(f”(建築){protopath}”) #出於某種原因,grpc_tools。protoc不包括_proto模塊導致錯誤:#“google.protobuf。時間戳”沒有定義。path_to_module = pkg_resources。resource_filename (“grpc_tools”、“_proto”) #旗幟args = (grpc_tools。protoc”, #參數不需要0 f”——proto_path = {proto_path}”, f”——python_out = {OUTPUT_PATH}”, f - i {path_to_module}, # f”——grpc_python_out = {OUTPUT_PATH}”, protopath.split (“/”) [1]) protoc.main (args)

我目前的想法是:

  • 執行上麵的代碼使用一個外部的機器。“my_message_derializer創建一個包。輪”,以此作為一個依賴庫的工作/任務/集群。這將需要更新每次原型文件更改使用如git人。
  • 或者,磚,安裝grpcio grpcio-tools,司機上運行和上麵類似的代碼。然後導入創建pb2類並使用消息像往常一樣。

還有其他的方式使用反序列化器和火花?少一點手冊嗎?

14日回複14

jose_gonzalez
主持人
主持人

嗨@Jani Sourander,

我發現這個庫sparksql-protobuf這可能會奏效。它沒有被更新。

謝謝你!

sourander
新的貢獻者三世

嗨@Jose岡薩雷斯和謝謝你的回複!圖書館看起來過時,缺乏文檔。我想知道ScalaPB將是一個更好的選擇。scala我沒有任何經驗,但是因為scala UDF與火花的表現良好。我想學習Scala,這種方法將是一個選擇。也會有一些其他福利(如寫其他磚UDF的Scala中的性能增加)

似乎開始MVP實際上是基於python的最佳選擇選項,在磚作為輪庫。

Dan_Z
尊敬的貢獻者

我肯定會去使用Python選項,這樣您可以使用PandasUDFs速度比scala udf。編碼快樂!

sourander
新的貢獻者三世

@Dan征服者你有什麼想法我怎麼優化查詢,需要訪問一個類方法?下麵的細節。

我創建了一個類ProtoFetcher各種proto_pb2主機。使用protoc py文件創建。工具(同樣在早前發布的文章)。它可以給一個名字被實例化的數據;內部導入正確some_proto_pb2類,將它賦給類變量並使用get_attr (“some_proto_pb2”、“name_of_this_message”)來獲取正確的GeneratedProtocolMessageType (google.protobuf.pyext.cpp_message中定義)。

遺憾的是,我似乎無法找到一個方法來使用UDF這類初始化不會內部這個函數。如果我不是完全錯誤的,這意味著初始化的類會為每一行作為一個循環。試圖訪問該方法從外部udf定義將提高“PicklingError:不能序列化對象:TypeError:不能google.protobuf.pyext._message泡菜”。MessageDescriptor對象”

@udf(“字符串”)def my_test_func (blob): d = ProtoFetcher (name_of_this_data)返回d.blob_to_json (blob) new_df = df。withColumn (“blob_as_json my_test_func (“original_blob_col”))

運行顯示()new_df大約需要40秒,這個測試文件隻有600行。我也試過pandas_udf試驗,得到了類似的結果。我將pandas_udf定義為:

@pandas_udf(“字符串”)def my_test_function (s: pd.Series) - > pd。係列:d = ProtoFetcher (“name_of_this_data”) s_json = s.apply (d.blob_to_json)返回s_json

注意:blob列有相當數量的數據,雖然。運行相同的代碼使用本地熊貓安裝需要32秒一個相當強大的筆記本電腦。這是運行代碼:

df [' new_col '] = df (“original_col”)蘋果(d.blob_to_json)

Dan_Z
尊敬的貢獻者

隻使用一個迭代器的係列UDF。在這裏看到的:https://docs.m.eheci.com/spark/latest/spark-sql/udf-python-pandas.html iterator-of-series-to-iter……

這允許您設置一些預定義的狀態(如加載一個文件)在做計算。

歡迎來到磚社區:讓學習、網絡和一起慶祝

加入我們的快速增長的數據專業人員和專家的80 k +社區成員,準備發現,幫助和合作而做出有意義的聯係。

點擊在這裏注冊今天,加入!

參與令人興奮的技術討論,加入一個組與你的同事和滿足我們的成員。

Baidu
map