安裝和編譯Cython

學習如何安裝和編譯Cython磚。

寫的亞當Pavlacka

去年發表在:2022年5月19日

本文解釋了如何運行火花代碼編譯Cython代碼。的步驟如下:

  1. 創建一個示例Cython DBFS模塊(AWS|Azure)。
  2. 將文件添加到火花會話。
  3. 創建一個包裝器方法來加載模塊的執行人。
  4. 樣本數據集的映射器運行。
  5. 產生更大的數據集,比較性能與本機Python示例。
刪除

信息

默認情況下,使用路徑dbfs: /如果沒有協議引用。

% python #寫一個例子cython / cython fib模塊/例子。pyx DBFS。dbutils.fs.put (“/ / cython /無傷大雅的例子。pyx”、“”“def fib_mapper_cython (n):“返回第一個fibonnaci數量> n。“cdef int = 0 cdef int b = 1 cdef int j = int (n),而b < j: a、b = b, a + b返回b, 1”“”,真的)#寫一個示例輸入文件/ / cython /輸入示例。在DBFS txt。#此文件的每一行是一個整數。dbutils.fs.put (“/ / cython_input /輸入示例。txt”、“100”“10”,真的)#看看示例輸入。dbutils.fs.head(“/例子/ cython_input / input.txt”)

Cython源文件添加到火花

使Cython跨集群可用的源文件,我們將使用sc.addPyFile將這些文件添加到火花。例如,

% python sc.addPyFile (“dbfs: / / cython / fib.pyx例子”)

測試Cython編譯驅動節點上

這段代碼首先將測試編譯驅動節點。

% python導入pyximport導入操作係統pyximport.install進口fib ()

定義了wap功能編譯和導入模塊

打印語句將執行器節點上執行。您可以查看stdout日誌消息的進步跟蹤模塊。

% python導入係統,操作係統、shutil cython def spark_cython(模塊、方法):def (* args, * * kwargs):包裝印刷的輸入函數:% s % args全球cython_function_嚐試:返回cython_function_ (* args, * * kwargs)除了:進口pyximport pyximport.install()打印“cython編譯完成”cython_function_ = getattr (__import__(模塊),方法)印刷的定義功能:% s % cython_function_返回cython_function_ (* args, * * kwargs)返回包裹

運行Cython例子

下麵的代碼片段在幾個數據點運行fibonacci示例。

% python #使用CSV讀者產生火花DataFrame。回滾到抽樣從GenericRowObject DataFrames和抓住單一元素行= spark.read.csv .rdd(“/例子/ cython_input /”)。地圖(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

性能比較

下麵我們將測試2之間的速度差實現。我們將使用spark.range ()api生成數據點從10000年到100000000年,50個火花分區。我們將寫這個輸出DBFS CSV。

對於這個測試,禁用自動定量(AWS|Azure)為了確保集群有固定數量的火花執行人。

python dbutils.fs %。rm (“/ tmp / cython_input /”,真的)火花。範圍(10000、100000000、1、50).write.csv (“/ tmp / cython_input /”)

正常PySpark代碼

% python def fib_mapper_python (n): b = 0 = 1打印“嚐試:% s % n,而b < int (n): a、b = b, a + b返回(b, 1)打印fib_mapper_python(2000)行= spark.read.csv .rdd (“/ tmp / cython_input /”)。地圖(λy: y.__getitem__ (0)) fib_frequency =線。地圖(λx: fib_mapper_python (x))。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

測試Cython代碼

現在測試Cython編譯代碼。

% = spark.read.csv python行(“/ tmp / cython_input /”) .rdd。地圖(λy: y.__getitem__ (0)) mapper = spark_cython (fib, fib_mapper_cython) fib_frequency = lines.map(映射)。reduceByKey(λa、b: a + b) .collect()打印fib_frequency

我們生成的測試數據集有50個火花分區,創建50 csv文件所示。您可以查看的數據集dbutils.fs.ls (“/ tmp / cython_input /”)

這篇文章有用嗎?