在开发PySpark
程序时通常会需要用到Java的对象,而PySpark
本身也是建立在Java API之上,通过Py4j来创建JavaSparkContext
。
这里有几点是需要注意的
1. Py4j
只运行在driver
也就是说worker
目前来说引入不了第三方的jar包。因为worker
结点的PySpark是没有启动Py4j的通信进程的,相应的jar包自然也加载不了。之前没有详细看这部分文档,系统设计时企图在worker
结点利用client模式直连Hbase来获取部分数据,从而避免对整个表的JOIN操作,当然对于python来说这样的操作只有通过引入jar包来实现(不考虑thrift方式)。但是测试的jar写好之后,一直不成功,最后只有修改方案,后来才去查了官方文档。
2. PythonRDD
的原型是 JavaRDD[String]
所有的经过PythonRDD传递的数据都通过BASE64编码
3. PySpark
中的方法和匿名函数是通过cloudpickle
序列化
为何函数需要被序列化,因为做map
或者flatMap
时,此时的函数或者lambda表达式是需要传递到各个worder
的,如果函数里有用到闭包,cloudpickle
也能巧妙的序列化。但是,需要传递的函数里请不要是用self
关键字,因为传递过去后,self
的指代关系已经不明确了。
文档还提到
PythonRDD
的序列化是可定制的了,但是目前没这个需求,所有没测试
代码示例
java 测试代码, 编译生成 pyspark-test.jar
package org.valux.py4j;public class Calculate { public int sqAdd(int x){ return x * x + 1; }}
Python 测试代码,放在文件 driver.py
from pyspark import SparkContextfrom py4j.java_gateway import java_importsc = SparkContext(appName="Py4jTesting")java_import(sc._jvm, "org.valux.py4j.Calculate")func = sc._jvm.Calculate()print func.sqAdd(5)"""[OUTPUT] > 26"""
""" !!![错误用法] 这里是想在每个work上调用自定义的方法, 前面已经提到过PySpark目前是不支持的"""rdd = sc.parallelize([1, 2, 3])result = rdd.map(func.sqAdd).collect()""" !!![错误用法] 之前还有个错误的思路是想在work单独 import 相应的 jar """def foo(x): java_import(sc._jvm, "org.valux.py4j.Calculate") func = sc._jvm.Calculate() func.sqAdd(x)rdd = sc.parallelize([1, 2, 3])rdd.map(foo).collect()
测试时,提交程序需要记得带上jar包
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py
这里又有一个坑,之前提交为了方便,一直都用的是 --jars 参数
--driver-class-path 附加的 jar 只会在
driver
引入 --jars 附加的jar会在所有worker
引入
帮助文档里面还提到
--jars Comma-separated list of local jars to include on the driver and executor classpaths.
所有就偷个懒用了 --jars ,结果一直报如下错误:
py4j.protocol.Py4JError: Trying to call a package.
测试了好久终于解决了