关于FLINK PYTHON UDF

2020-04-15 文章
你好

   我在使用kafka produce数据后,在python中使用UDF做一个add function,但
是最后的sink文件里面没有任何数据,

如果不用UDF的话直接获取一个数据在最后的sink文件里面是有数据的如下所示,DEBUG
很久也不清楚是什么原因是否能帮忙分下

 

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}

 



 



 

 

测试结果

Kafka用的测试数据 {"a": "bbb", "b": 3, "c": 1}



 

 

 

st_env.from_path("source")\
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("add(b1,c1)") \ 无任何输出
.insert_into("result_tab")

无任何输出



 

 

st_env.from_path("source")\
.select("b.cast(LONG) as b1, c.cast(LONG) as c1") \
.select("c1")\   #正常输出

.insert_into("result_tab")

正确输出





回复: 关于kafka connector通过python链接

2020-04-14 文章
这个问题定义了 

我用了两个kafka包,其中红色的包是不需要的,非常感谢你们的帮助

flink-connector-kafka_2.11-1.10.0.jar

flink-sql-connector-kafka_2.11-1.10.0.jar

 

发件人: 秦寒  
发送时间: 2020年4月10日 10:15
收件人: 'Hequn Cheng' ; 'user-zh' 
主题: 回复: 关于kafka connector通过python链接

 

这个搞定了,pip3重装了一下apache flink,引入了jar包搞定

 

发件人: 秦寒 < <mailto:han...@chinaums.com> han...@chinaums.com> 
发送时间: 2020年4月9日 16:41
收件人: 'Hequn Cheng' < <mailto:he...@apache.org> he...@apache.org>; 'user-zh' < 
<mailto:user-zh@flink.apache.org> user-zh@flink.apache.org>
主题: 回复: 关于kafka connector通过python链接

 

您好

   根据你们的说明我做了如下配置,我用的是flink 1.10版本

1在pyflink/lib下面添加了kafka-clients-2.2.0.jar



 

2 在build-target/lib下面添加了flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-connector-kafka_2.11-1.10.0.jar 以及 flink-json-1.10.0-sql-jar.jar



 

3 构建PyFlink发布包并安装

cd flink-python; python setup.py sdist 
pip install dist/*.tar.gz

 

 

4 执行测试程序tumble_window.py报错如下,不知道你们有没有遇见过这个错误,望能解答

[yy1s@rbtnode1 project]$ python3 tumble_window.py





 

 

 

发件人: Hequn Cheng < <mailto:he...@apache.org> he...@apache.org> 
发送时间: 2020年4月9日 10:08
收件人: user-zh < <mailto:user-zh@flink.apache.org> user-zh@flink.apache.org>
抄送:  <mailto:han...@chinaums.com> han...@chinaums.com
主题: Re: 关于kafka connector通过python链接

 

Hi 秦寒,

 

Dian 说得很完善了。除此之外,金竹的博客[1]有介绍“Python API 中如何使用 Kafka”,可能对你有帮助,可以看下。

 

Best, Hequn

 

[1] 
https://enjoyment.cool/2019/08/28/Apache%20Flink%20%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%20Python%20API%20%E4%B8%AD%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%20Kafka/

 

On Thu, Apr 9, 2020 at 9:34 AM Dian Fu mailto:dian0511...@gmail.com> > wrote:

你指的是Python Table API中如何使用kafka connector的例子吗?这个是有例子的[1]。

关于如何把kafka client的jar包配置到Python环境,分两种情况,当前有对应的两种解决方案:
1)如果是local运行,需要把kafka client的jar拷贝到python环境中pyflink的lib目录下
2)如果是remote运行,可以通过CLI的-j选项添加。

这两种方式对于Python用户来说可能都不太便捷,所以已有一个JIRA[3]在考虑添加另外一种对Python用户来说更友好的方式,欢迎到JIRA里参与讨论。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html>
[3] https://issues.apache.org/jira/browse/FLINK-16943 
<https://issues.apache.org/jira/browse/FLINK-16943>
> 在 2020年4月9日,上午8:45,zhisheng  <mailto:zhisheng2...@gmail.com> > 写道:
> 
> hi, 秦寒
> 
> 暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议
> 
> Best
> 
> zhisheng
> 
> 秦寒 mailto:han...@chinaums.com> > 于2020年4月8日周三 下午4:10写道:
> 
>> 您好
>> 
>>   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
>> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
>> 环境等,谢谢。
>> 
>> 
>> 
>> 



回复: 关于kafka connector通过python链接

2020-04-14 文章
这个搞定了,pip3重装了一下apache flink,引入了jar包搞定

 

发件人: 秦寒  
发送时间: 2020年4月9日 16:41
收件人: 'Hequn Cheng' ; 'user-zh' 
主题: 回复: 关于kafka connector通过python链接

 

您好

   根据你们的说明我做了如下配置,我用的是flink 1.10版本

1在pyflink/lib下面添加了kafka-clients-2.2.0.jar



 

2 在build-target/lib下面添加了flink-sql-connector-kafka_2.11-1.10.0.jar  
flink-connector-kafka_2.11-1.10.0.jar 以及 flink-json-1.10.0-sql-jar.jar



 

3 构建PyFlink发布包并安装

cd flink-python; python setup.py sdist 
pip install dist/*.tar.gz

 

 

4 执行测试程序tumble_window.py报错如下,不知道你们有没有遇见过这个错误,望能解答

[yy1s@rbtnode1 project]$ python3 tumble_window.py





 

 

 

发件人: Hequn Cheng mailto:he...@apache.org> > 
发送时间: 2020年4月9日 10:08
收件人: user-zh mailto:user-zh@flink.apache.org> >
抄送: han...@chinaums.com <mailto:han...@chinaums.com> 
主题: Re: 关于kafka connector通过python链接

 

Hi 秦寒,

 

Dian 说得很完善了。除此之外,金竹的博客[1]有介绍“Python API 中如何使用 Kafka”,可能对你有帮助,可以看下。

 

Best, Hequn

 

[1] 
https://enjoyment.cool/2019/08/28/Apache%20Flink%20%E8%AF%B4%E9%81%93%E7%B3%BB%E5%88%97-%20Python%20API%20%E4%B8%AD%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%20Kafka/

 

On Thu, Apr 9, 2020 at 9:34 AM Dian Fu mailto:dian0511...@gmail.com> > wrote:

你指的是Python Table API中如何使用kafka connector的例子吗?这个是有例子的[1]。

关于如何把kafka client的jar包配置到Python环境,分两种情况,当前有对应的两种解决方案:
1)如果是local运行,需要把kafka client的jar拷贝到python环境中pyflink的lib目录下
2)如果是remote运行,可以通过CLI的-j选项添加。

这两种方式对于Python用户来说可能都不太便捷,所以已有一个JIRA[3]在考虑添加另外一种对Python用户来说更友好的方式,欢迎到JIRA里参与讨论。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector>
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html>
[3] https://issues.apache.org/jira/browse/FLINK-16943 
<https://issues.apache.org/jira/browse/FLINK-16943>
> 在 2020年4月9日,上午8:45,zhisheng  <mailto:zhisheng2...@gmail.com> > 写道:
> 
> hi, 秦寒
> 
> 暂时还没有 Python 这块的 API,可以去社区 JIRA 提建议
> 
> Best
> 
> zhisheng
> 
> 秦寒 mailto:han...@chinaums.com> > 于2020年4月8日周三 下午4:10写道:
> 
>> 您好
>> 
>>   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
>> 调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
>> 环境等,谢谢。
>> 
>> 
>> 
>> 



关于kafka connector通过python链接

2020-04-08 文章
您好

   Flink的 kafka connector 文档中只有java 和scala的列子,能否添加python
调用kafka的列子,包括如何添加kafka connector,kafka client的jar包配置到pyhon
环境等,谢谢。