pyflink对Redis sink的支持

2021-02-21 Thread whh_960101
各位大佬, 请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!

pyflink对Redis sink的支持

2021-02-21 Thread whh_960101
各位大佬, 请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!

pyflink 1.11 运行pyflink作业时报错

2020-11-17 Thread whh_960101
Hi,各位大佬,pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in from_kafka_to_oracle_demo main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result() File

pyflink 1.11 运行pyflink作业时报错

2020-11-13 Thread whh_960101
Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行 lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2] No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-27 Thread whh_960101
/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote: >> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch >> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/project

提交pyflink作业到YARN集群报错

2020-10-26 Thread whh_960101
Hi,各位大佬, 想请教一下,我使用flink run -m yarn-cluster -p 4 -py myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar. at org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263) at

pyflink和flink版本的兼容性问题

2020-10-22 Thread whh_960101
Hi,各位大佬, 想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink 1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12 升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 P

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47 P

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗? 在 2020-10-22 16:34:56,"Yangze Guo" 写道: >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > >Best, >Yangze Guo > >On Thu, Oct 22, 2020 at 3:47

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 Thread whh_960101
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable ( user_id STRING, user_name STRING uv

Re:Re: pyflink1.11.0 报错JobExecutionException: Job execution failed.

2020-10-20 Thread whh_960101
Row类型的对象在python中是怎么表示的,字典? 在 2020-10-20 20:35:22,"Dian Fu" 写道: >你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。 > >> 在 2020年10月20日,下午7:56,Dian Fu 写道: >> >> 错误堆栈看着似乎不太完整,有更完整的堆栈吗? >> >>> 在 2020

pyflink1.11.0 报错JobExecutionException: Job execution failed.

2020-10-20 Thread whh_960101
Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(), result_type=DataTypes.BOOLEAN()) def error_exist(message): if message is None: return False mes_dic =

pyflink1.11.0 kafka connector如果有访问权限

2020-10-19 Thread whh_960101

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > >> 在 2020年10月15日,下午7:02,whh_960101 写道: >> >> hi, >> 我刚才改了一下你的例子[1],通过from_elements构建一个source表 >> 然后使用我的udf >> source.where(udf1(msg)=True).select("udf2(msg)").execute_ins

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
, clusterName]) ship_strategy : FORWARD 在 2020-10-15 20:59:12,"Dian Fu" 写道: >可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables > >> 在 2020年10

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
ecution_result().result() 打印结果就是经过筛选后的 想请问一下这种问题出在哪里? 在 2020-10-15 16:57:39,"Xingbo Huang" 写道: >Hi, >我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 > > >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >B

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
> >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >Best, >Xingbo > >whh_960101 于2020年10月15日周四 下午2:30写道: > >> 您好,我使用pyflink时的代码如下,有如下问题: >> >> >> source = st_env.from_path('source') >>

Re:Re: pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
? 在 2020-10-15 16:57:39,"Xingbo Huang" 写道: >Hi, >我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的 > > >[1] >https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67 > >Best, >Xingbo > >whh_960101 于2020年10月15日

pyflink Table object如何打印出其中内容方便调试

2020-10-15 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table = source.select("msg").where(udf1(msg)=True) 这样单步调试print(table)出来的结果是 pyflink有没有将Table转化成可打印格式的方法

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-15 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =

pyflink sql中select,where都带udf,其中一个udf失效

2020-10-14 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 #只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN()) table =

pyflink sql select带特殊符号的字段名

2020-10-14 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下问题: source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 table = source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()

pyflink如何提交作业到CDP集群中运行

2020-09-24 Thread whh_960101
您好,请问pyflink如何提交作业到CDP集群中运行,有没有示例演示,感谢!

pyflink如何提交作业到集群运行

2020-09-24 Thread whh_960101
您好,请问pyflink如何提交作业到集群运行,有没有示例演示,感谢!

pyflink连接器支持问题

2020-09-17 Thread whh_960101
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!

Re:Re: pyflink execute_insert问题求解答

2020-09-09 Thread whh_960101
问题1: 我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api 例如: if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗 .. 问题2: full_outer_join(right, join_predicate)[source]¶ Joins two Table. Similar to a SQL full outer join. The fields of the two

pyflink execute_insert问题求解答

2020-09-08 Thread whh_960101
您好,我使用pyflink时的代码如下,有如下两个问题: 1. source = st_env.from_path('source') #st_env是StreamTableEnvironment,source是kafka源端 main_table = source.select("...") sub_table = source.select("...") main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread whh_960101
se-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html > >Best, >Xingbo > >whh_960101 于2020年9月7日周一 上午11:22写道: > >> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如 >> dic = {1:'a',2:'b'} >> 此时定义udf如下: >> >> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),Da

Re:Re: Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-07 Thread whh_960101
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas >[2] 来读取一个dataframe。 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html >[2] >https://ci.apache.org/projects/flink/flink-

Re:Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-06 Thread whh_960101
false', > 'json.ignore-parse-errors' = 'true' >) >""") > >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result() >``` > >Best, >Xingbo > > > >whh_960101 于2020年9月4日周五 下午3:4

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
t;).to_pandas() >``` >然后,你可以看看官方文档[1],让你快速上手PyFlink > >Best, >Xingbo > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >whh_960101 于2020年9月4日周五 下午2:50写道: > >> 您好,numpy的array和DataTypes.ARR

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
>然后,你可以看看官方文档[1],让你快速上手PyFlink > >Best, >Xingbo > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >whh_960101 于2020年9月4日周五 下午2:50写道: > >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 >>

Re:Re: Re: pyflink-udf 问题反馈

2020-09-04 Thread whh_960101
? 在 2020-09-04 10:35:03,"Xingbo Huang" 写道: >Hi, > >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 > >Best, >Xingbo > >whh_960101 于2020年9月4日周五 上午9:26写道: > >> >> 您好,我的问题是:首先我有一个source的环境,with_fo

Re:Re: pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
fs.html#scalar-functions > >Best, >Xingbo > > 于2020年9月3日周四 下午9:45写道: > >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二

回复:pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 或者正确写法是什么样的,感谢解答! | | whh_960101 | | 邮箱:whh_960...@163.com | 签名由 网易邮箱大师 定制 在2020年09月03日 21:14,Xingbo Huang

pyflink-udf 问题反馈

2020-09-03 Thread whh_960101
您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid input_type:input_type should be DataType but contain RowField(RECID, VARCHAR) 我的pyflink版本:1.11.1