各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!
各位大佬,
请问pyflink现在有对Redis sink的支持吗,有没有demo可以参考,感谢!
Hi,各位大佬,pyflink 1.11 将pyflink作业提交到yarn集群运行,作业在将处理后的main_table
insert到sink端的kafka时报错File "/home/cdh272705/poc/T24_parse.py", line 179, in
from_kafka_to_oracle_demo
main_table.execute_insert("sink")#.get_job_client().get_job_execution_result().result()
File
Hi,各位大佬,pyflink 1.11 运行pyflink作业时报错pyflink/pyfink_gateway_server.py 193行
lauch_gateway_server_process()217行 return Popen()FileNotFoundError: [Error 2]
No such file or directory: 'java' : 'java'感觉像找不到java路径还是什么意思,java:java没看懂 ,该怎么解决
/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/project
Hi,各位大佬, 想请教一下,我使用flink run -m yarn-cluster -p 4 -py
myjob.py,报错java.lang.RuntimeException: Found 0 flink-python jar.
at
org.apache.flink.client.program.PackagedProgram.getJobJarAndDependencies(PackagedProgram.java:263)
at
Hi,各位大佬,
想请教一下,我的flink的版本是1.10.0,pyflink版本是1.11.1,目前使用pyflink没有兼容性问题,想问一下,马上要更新的flink
1.12,如果在正式发布后,我只是pip install --upgrade apache-flink==1.12
升级pyflink到1.12.0,flink 1.10.0 版本保持不变,会存在很多兼容性问题吗
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 P
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 P
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
在 2020-10-22 16:34:56,"Yangze Guo" 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47
Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
TABLE myUserTable (
user_id STRING,
user_name STRING
uv
Row类型的对象在python中是怎么表示的,字典?
在 2020-10-20 20:35:22,"Dian Fu" 写道:
>你这两个UDF(error_get和headers_get)实际的返回值类型都是string,但是却标成Row类型。如果确实需要返回Row类型,udf的返回值需要返回一个Row类型的对象。
>
>> 在 2020年10月20日,下午7:56,Dian Fu 写道:
>>
>> 错误堆栈看着似乎不太完整,有更完整的堆栈吗?
>>
>>> 在 2020
Hi,各位大佬,我处理kafka source Table,print 一个嵌套的json结果,报错JobExecutionException: Job
execution failed. 这个问题该怎么解决,代码和报错信息如下:@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.BOOLEAN())
def error_exist(message):
if message is None:
return False
mes_dic =
11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10月15日,下午7:02,whh_960101 写道:
>>
>> hi,
>> 我刚才改了一下你的例子[1],通过from_elements构建一个source表
>> 然后使用我的udf
>> source.where(udf1(msg)=True).select("udf2(msg)").execute_ins
, clusterName])
ship_strategy : FORWARD
在 2020-10-15 20:59:12,"Dian Fu" 写道:
>可以把plan打印出来看一下?打印plan可以参考这个:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html#explain-tables
>
>> 在 2020年10
ecution_result().result()
打印结果就是经过筛选后的
想请问一下这种问题出在哪里?
在 2020-10-15 16:57:39,"Xingbo Huang" 写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>B
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 于2020年10月15日周四 下午2:30写道:
>
>> 您好,我使用pyflink时的代码如下,有如下问题:
>>
>>
>> source = st_env.from_path('source')
>>
?
在 2020-10-15 16:57:39,"Xingbo Huang" 写道:
>Hi,
>我们有例子覆盖你这种情况的[1],你需要看下你udf1的逻辑是什么样的
>
>
>[1]
>https://github.com/apache/flink/blob/release-1.11/flink-python/pyflink/table/tests/test_udf.py#L67
>
>Best,
>Xingbo
>
>whh_960101 于2020年10月15日
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
#udf1 (input_types =DataTypes.STRING(), result_type = DataTypes.BOOLEAN())
table = source.select("msg").where(udf1(msg)=True)
这样单步调试print(table)出来的结果是
pyflink有没有将Table转化成可打印格式的方法
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
DataTypes.BOOLEAN())
table =
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
#只在where语句中加udf1 (input_types =DataTypes.STRING(), result_type =
DataTypes.BOOLEAN())
table =
您好,我使用pyflink时的代码如下,有如下问题:
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
table =
source.select("@timestamp").execute_insert('sink').get_job_client().get_job_execution_result().result()
您好,请问pyflink如何提交作业到CDP集群中运行,有没有示例演示,感谢!
您好,请问pyflink如何提交作业到集群运行,有没有示例演示,感谢!
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!
问题1:
我已经生成了一个Table对象main_table,我如何才能取其中的字段'data'中的值,加入条件判断语句中呢,没找到有合适的api
例如:
if main_table.to_pandas()['data'].iloc[0] == '': #我现在是通过转成pandas来操作,还有其他好用的方法吗
..
问题2:
full_outer_join(right, join_predicate)[source]¶
Joins two Table. Similar to a SQL full outer join. The fields of the two
您好,我使用pyflink时的代码如下,有如下两个问题:
1.
source = st_env.from_path('source')
#st_env是StreamTableEnvironment,source是kafka源端
main_table = source.select("...")
sub_table = source.select("...")
main_table.execute_insert('sink').get_job_client().get_job_execution_result().result()
se-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
>
>Best,
>Xingbo
>
>whh_960101 于2020年9月7日周一 上午11:22写道:
>
>> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
>> dic = {1:'a',2:'b'}
>> 此时定义udf如下:
>>
>> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),Da
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
>[2] 来读取一个dataframe。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>[2]
>https://ci.apache.org/projects/flink/flink-
false',
> 'json.ignore-parse-errors' = 'true'
>)
>""")
>
>table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>```
>
>Best,
>Xingbo
>
>
>
>whh_960101 于2020年9月4日周五 下午3:4
t;).to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARR
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101 于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>>
?
在 2020-09-04 10:35:03,"Xingbo Huang" 写道:
>Hi,
>
>你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>
>Best,
>Xingbo
>
>whh_960101 于2020年9月4日周五 上午9:26写道:
>
>>
>> 您好,我的问题是:首先我有一个source的环境,with_fo
fs.html#scalar-functions
>
>Best,
>Xingbo
>
> 于2020年9月3日周四 下午9:45写道:
>
>> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>>
>> 在def的时候应该是写def函数名(W):,然后函数内部取第二
我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
或者正确写法是什么样的,感谢解答!
| |
whh_960101
|
|
邮箱:whh_960...@163.com
|
签名由 网易邮箱大师 定制
在2020年09月03日 21:14,Xingbo Huang
您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
input_type:input_type should be DataType but contain RowField(RECID, VARCHAR)
我的pyflink版本:1.11.1
37 matches
Mail list logo