博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PySpark调用自定义jar包
阅读量:5065 次
发布时间:2019-06-12

本文共 1802 字,大约阅读时间需要 6 分钟。

在开发PySpark程序时通常会需要用到Java的对象,而PySpark本身也是建立在Java API之上,通过Py4j来创建JavaSparkContext

Cached / shuffled

这里有几点是需要注意的

1. Py4j只运行在driver

也就是说worker目前来说引入不了第三方的jar包。因为worker结点的PySpark是没有启动Py4j的通信进程的,相应的jar包自然也加载不了。之前没有详细看这部分文档,系统设计时企图在worker结点利用client模式直连Hbase来获取部分数据,从而避免对整个表的JOIN操作,当然对于python来说这样的操作只有通过引入jar包来实现(不考虑thrift方式)。但是测试的jar写好之后,一直不成功,最后只有修改方案,后来才去查了官方文档。

2. PythonRDD 的原型是 JavaRDD[String]

所有的经过PythonRDD传递的数据都通过BASE64编码

3. PySpark 中的方法和匿名函数是通过cloudpickle序列化

为何函数需要被序列化,因为做map或者flatMap时,此时的函数或者lambda表达式是需要传递到各个worder的,如果函数里有用到闭包,cloudpickle也能巧妙的序列化。但是,需要传递的函数里请不要是用self关键字,因为传递过去后,self的指代关系已经不明确了。

文档还提到PythonRDD的序列化是可定制的了,但是目前没这个需求,所有没测试

代码示例

java 测试代码, 编译生成 pyspark-test.jar

package org.valux.py4j;public class Calculate {    public int sqAdd(int x){        return x * x + 1;    }}

Python 测试代码,放在文件 driver.py

from pyspark import SparkContextfrom py4j.java_gateway import java_importsc = SparkContext(appName="Py4jTesting")java_import(sc._jvm, "org.valux.py4j.Calculate")func = sc._jvm.Calculate()print func.sqAdd(5)"""[OUTPUT] > 26"""

 

""" !!![错误用法] 这里是想在每个work上调用自定义的方法, 前面已经提到过PySpark目前是不支持的"""rdd = sc.parallelize([1, 2, 3])result = rdd.map(func.sqAdd).collect()""" !!![错误用法] 之前还有个错误的思路是想在work单独 import 相应的 jar    """def foo(x):    java_import(sc._jvm, "org.valux.py4j.Calculate")    func = sc._jvm.Calculate()    func.sqAdd(x)rdd = sc.parallelize([1, 2, 3])rdd.map(foo).collect()

 

测试时,提交程序需要记得带上jar包
> bin/spar-submit --driver-class-path pyspark-test.jar driver.py

这里又有一个坑,之前提交为了方便,一直都用的是 --jars 参数

--driver-class-path 附加的 jar 只会在 driver引入 --jars 附加的jar会在所有worker引入

帮助文档里面还提到

--jars Comma-separated list of local jars to include on the driver and executor classpaths.

所有就偷个懒用了 --jars ,结果一直报如下错误:

py4j.protocol.Py4JError: Trying to call a package.

测试了好久终于解决了

参考文档

转载于:https://www.cnblogs.com/errdev/p/4511303.html

你可能感兴趣的文章
概率与数学期望
查看>>
ARP(Address Resolution Protocol)地址解析协议初识
查看>>
js array 的理解
查看>>
自然数幂求和方法1:扰动法(求两次)
查看>>
SQL SERVER 中如何用脚本管理作业
查看>>
爬虫之证书错误
查看>>
【Top】Plan (updating...)
查看>>
Android开发之IPC进程间通信-AIDL介绍及实例解析
查看>>
Python学习 Day 025 -模块相关
查看>>
android系统action大全
查看>>
学习进度
查看>>
maven命令大全
查看>>
sql常用语句
查看>>
全局变量和局部变量
查看>>
2.linux换源问题
查看>>
mvc根据绝对路径下载文件
查看>>
第六章作业1
查看>>
OpenGL超级宝典笔记——遮挡查询 [转]
查看>>
asp.net 获得域名,端口,虚拟目录[转]
查看>>
GoLand快捷键
查看>>