本文解釋了如何運行火花代碼編譯Cython代碼。的步驟如下:
- 創建一個示例Cython DBFS模塊(AWS|Azure)。
- 將文件添加到火花會話。
- 創建一個包裝器方法來加載模塊的執行人。
- 樣本數據集的映射器運行。
- 產生更大的數據集,比較性能與本機Python示例。
% python #寫一個例子cython / cython fib模塊/例子。pyx DBFS。dbutils.fs.put (“/ / cython /無傷大雅的例子。pyx”、“”“def fib_mapper_cython (n):“返回第一個fibonnaci數量> n。“cdef int = 0 cdef int b = 1 cdef int j = int (n),而b < j: a、b = b, a + b返回b, 1”“”,真的)#寫一個示例輸入文件/ / cython /輸入示例。在DBFS txt。#此文件的每一行是一個整數。dbutils.fs.put (“/ / cython_input /輸入示例。txt”、“100”“10”,真的)#看看示例輸入。dbutils.fs.head(“/例子/ cython_input / input.txt”)
Cython源文件添加到火花
使Cython跨集群可用的源文件,我們將使用sc.addPyFile將這些文件添加到火花。例如,
% python sc.addPyFile (“dbfs: / / cython / fib.pyx例子”)
測試Cython編譯驅動節點上
這段代碼首先將測試編譯驅動節點。
% python導入pyximport導入操作係統pyximport.install進口fib ()
定義了wap功能編譯和導入模塊
打印語句將執行器節點上執行。您可以查看stdout日誌消息的進步跟蹤模塊。
% python導入係統,操作係統、shutil cython def spark_cython(模塊、方法):def (* args, * * kwargs):包裝印刷的輸入函數:% s % args全球cython_function_嚐試:返回cython_function_ (* args, * * kwargs)除了:進口pyximport pyximport.install()打印“cython編譯完成”cython_function_ = getattr (__import__(模塊),方法)印刷的定義功能:% s % cython_function_返回cython_function_ (* args, * * kwargs)返回包裹
運行Cython例子
下麵的代碼片段在幾個數據點運行fibonacci示例。
% python #使用CSV讀者產生火花DataFrame。回滾到抽樣從GenericRowObject DataFrames和抓住單一元素行= spark.read.csv .rdd(“/例子/ cython_input /”)。地圖(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency
性能比較
下麵我們將測試2之間的速度差實現。我們將使用spark.range ()api生成數據點從10000年到100000000年,50個火花分區。我們將寫這個輸出DBFS CSV。
對於這個測試,禁用自動定量(AWS|Azure)為了確保集群有固定數量的火花執行人。
python dbutils.fs %。rm (“/ tmp / cython_input /”,真的)火花。範圍(10000、100000000、1、50).write.csv (“/ tmp / cython_input /”)
正常PySpark代碼
% python def fib_mapper_python (n): b = 0 = 1打印“嚐試:% s % n,而b < int (n): a、b = b, a + b返回(b, 1)打印fib_mapper_python(2000)行= spark.read.csv .rdd (“/ tmp / cython_input /”)。地圖(λy: y.__getitem__ (0)) fib_frequency =線。地圖(λx: fib_mapper_python (x))。reduceByKey(λa、b: a + b) .collect()打印fib_frequency
測試Cython代碼
現在測試Cython編譯代碼。
% = spark.read.csv python行(“/ tmp / cython_input /”) .rdd。地圖(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency
我們生成的測試數據集有50個火花分區,創建50 csv文件所示。您可以查看的數據集dbutils.fs.ls (“/ tmp / cython_input /”)。