Re: pyflink-udf 问题反馈

2020-09-03 文章 Xingbo Huang
Hi, 我觉得你从头详细描述一下你的表结构。 比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, 然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] [1]

回复:pyflink-udf 问题反馈

2020-09-03 文章 whh_960101
我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 或者正确写法是什么样的,感谢解答! | | whh_960101 | | 邮箱:whh_960...@163.com | 签名由 网易邮箱大师 定制 在2020年09月03日 21:14,Xingbo Huang

pyflink-udf 问题反馈

2020-09-03 文章 whh_960101
您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid input_type:input_type should be DataType but contain RowField(RECID, VARCHAR) 我的pyflink版本:1.11.1

Re: pyflink-udf 问题反馈

2020-09-03 文章 Xingbo Huang
Hi, input_types定义的是每一个列的具体类型。 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 正确的写法是 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()] 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()), DataTypes.FIELD("b",

Re:Re: pyflink-udf 问题反馈

2020-09-03 文章 whh_960101
您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 udf定义如下:

Re: Re: pyflink-udf 问题反馈

2020-09-03 文章 Xingbo Huang
Hi, 你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 Best, Xingbo whh_960101 于2020年9月4日周五 上午9:26写道: > >

Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 的位点开始消费,保证 exactly-once. 如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

2020-09-03 文章 dty...@163.com
hi: 请教一个问题。在使用k8s 部署的flink 集群,如果jobmanger 重启后,1)job所在的jar包会清除,jobmanager 找不到这个job的jar 包,2)正在运行的job也会取消,重启后的jobmanager 如何找到之前运行的job dty...@163.com

flink读取kafka遇到kafka.consumer:type=app-info,id=1

2020-09-03 文章 宁吉浩
把正则读取kafka-topic的代码进行了修改,然后问题消失了,贴一下读取的代码给大家看看 code: FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<>( Pattern.compile(readTopic), // 正则读取 new SimpleStringSchema(), // 序列化 properties); kafkaConsumer.setStartFromLatest(); 实在想不清楚这里会有什么问题?

Re: flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

2020-09-03 文章 Yun Tang
Hi Please use English to ask questions in user mailing list. I have added this thread to user-zh mailing list, if you would like to reply this thread again, please remove user mailing list in senders. When talking about the question how to handle job manager failure in k8s, you could consider

Re: FlinkKafkaConsumer问题

2020-09-03 文章 taochanglian
为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。 在 2020/9/4 10:34, Shuiqiang Chen 写道: Hi, 为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source 算子维护当前算子所消费的 partition 消费 offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit

flink读取kafka遇到kafka.consumer:type=app-info,id=1

2020-09-03 文章 宁吉浩
使用flink1.9.3 , kafka 1.0.0 flink使用正则读取kafka的多个topic , 每个topic均为3个分区,flink这边并行度是3(1也尝试过),均出现了如下问题 有没有人遇到过这个问题呢?如何解决的 报错如下所示: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=1 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at

Re: flink sql多层view嵌套,字段not found

2020-09-03 文章 Danny Chan
这是一个已知问题,社区版本已经修复了 [1],不过还有一个后续 PR https://github.com/apache/flink/pull/13293,待 merge [1] https://issues.apache.org/jira/browse/FLINK-18750 Best, Danny Chan 在 2020年9月3日 +0800 PM6:41,Lin Hou ,写道: > Hi, > > 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。 > 元数据已经建好,简述如下: > >

Re: Flink SQL 任务乱码问题

2020-09-03 文章 Danny Chan
SQL 文本是什么编码 ?有尝试过 UTF8 编码 ? Best, Danny Chan 在 2020年9月3日 +0800 PM5:04,LakeShen ,写道: > Hi 社区, > > 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下: > > select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va > from xxx ; > > 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。 > > 目前有什么比较好的解决方法吗。 > >

Re: Flink如何实现至多一次(At Most Once)

2020-09-03 文章 Yun Tang
Hi 如果是完全依赖source的offset管理,可以达到类似 at most once 的语义。 社区其实也有更完备的checkpoint at most once 的实现讨论,已经抄送了相关的开发人员 @yuanmei.w...@gmail.com 祝好 唐云 From: Paul Lam Sent: Thursday, September 3, 2020 17:28 To: user-zh Subject: Re:

Re: FlinkKafkaConsumer问题

2020-09-03 文章 lec ssmi
是flink对kafka的消费,是自己管理offset,用low-level api去寻址,而不是用group.id来管理offset这种high-level api。是这个意思吧。 op <520075...@qq.com> 于2020年9月4日周五 上午10:25写道: > > 谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢? > > > --原始邮件-- > 发件人: >

?????? FlinkKafkaConsumer????

2020-09-03 文章 op
FlinkKafkaConsumerKafkaConsumer??flinkkafka ---- ??: "user-zh"

Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-03 文章 Benchao Li
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink

Re: pyflink1.11.1连接hive问题

2020-09-03 文章 Dian Fu
你看看log文件里,有没有其他log,log文件位置:/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyflink/log/ 从你贴的异常来看,感觉像是连HiveMetastore出问题了,没有连上,可以看看log文件里,能不能看到具体原因。 > 在 2020年9月3日,下午4:37,程龙 <13162790...@163.com> 写道: > > 完整日志乳如下: > > > > Traceback (most recent call last): > >

FlinkKafkaConsumer????

2020-09-03 文章 op
hi, FlinkKafkaConsumer //--- val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment Env.setRestartStrategy(RestartStrategies.noRestart()) val consumerProps = new Properties()

Re: flink json ddl解析

2020-09-03 文章 Dream-底限
hi 我现在使用的方式就是对于类型不一致的数组元素全部使用ARRAY来解析,感觉这是一个可以优化的点,想找一个更好的方式 Benchao Li 于2020年9月3日周四 下午12:57写道: > Hi, > 如果声明为 ARRAY 是否可以满足你的需求呢?如果可以的话,你可以在 > 1.12之后使用这个feature[1]. > > [1] https://issues.apache.org/jira/browse/FLINK-18002 > > zilong xiao 于2020年9月1日周二 下午5:49写道: > > > 问题大概懂了,坐等Flink大佬回复 > > > >

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-03 文章 Liu Rising
Hi 找到原因了。 问题在于在定义ListState时使用了transient关键字,如下。 private transient ListState state; 去掉了transient之后,问题解决。 虽然不太清粗为何transient会造成这种情况。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 涉及到 state 恢复能从 RocketMQ 直接转到 Kafka 吗?

2020-09-03 文章 Congxian Qiu
从之前的 checkpoint/savepoint 恢复的话,加上 -n 或者 --allowNonRestoredState 是可以恢复的,不过需要注意如何保证从 *特定* 的 offset 进行恢复 Best, Congxian Paul Lam 于2020年9月3日周四 上午11:59写道: > 可以,保证 RokcetMQ source 算子的 uid 和原本的 Kafka source 算子的 uid 不同就行。 > 另外启动要设置参数 -n 或 —allowNonRestoredState 。 > > Best, > Paul Lam > > > 2020年9月2日

??????checkpoint??????state

2020-09-03 文章 sun
??2?? 1checkpointchk- ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment =

??????checkpoint??????state

2020-09-03 文章 sun
??2?? 1checkpointchk- ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment =

????????????checkpoint??????state

2020-09-03 文章 sun
---- ??: "user-zh" <13162790...@163.com;

pyflink1.11.1连接hive问题

2020-09-03 文章 程龙
使用hivecatalog连接 抱一下错误 flink py4j.protocol.Py4JJavaError: An error occurred while calling o10.registerCatalog. : java.lang.NullPointerException

Re:Re: pyflink1.11.1连接hive问题

2020-09-03 文章 程龙
完整日志乳如下: Traceback (most recent call last): File "/Users/bjhl/PycharmProjects/flink-example/com.baijiahulian/connecthive/HiveTest.py", line 26, in t_env.register_catalog("default", hive_catalog) File

Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-03 文章 Yun Tang
Hi 我觉得这个不是root cause,实际上 transient ListState 是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。 麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。 祝好 唐云 From: Liu Rising Sent: Thursday, September 3, 2020 12:26 To: user-zh@flink.apache.org Subject: Re:

Re: Flink如何实现至多一次(At Most Once)

2020-09-03 文章 Paul Lam
如果每次都从最新的数据开始读的话,关掉 checkpoint 是可以达到 At Most Once。 另外建议还要看看 sink 有没有自动重试机制,可能造成数据重复。 Best, Paul Lam > 2020年9月2日 19:16,Tianwang Li 写道: > > 我们有一些场景,对实时性要求高,同时对数据重复会有比较大大影响。 > 我想关闭checkpoint,这样是不是能不能保证“至多一次” (At Most Once) ? > 这里会不会有什么坑? > 另外:我们允许丢失数据。 > > > -- >

??????checkpoint??????state

2020-09-03 文章 sun
??2?? 1checkpointchk- ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment =

??????checkpoint??????state

2020-09-03 文章 sun
??2?? 1checkpointchk- ??chk-1??chk-2 , 2checkpoint??state?? ?? final StreamExecutionEnvironment streamExecutionEnvironment =

Re: pyflink1.11.1连接hive问题

2020-09-03 文章 Dian Fu
有更完整的log吗? > 在 2020年9月3日,下午4:12,程龙 <13162790...@163.com> 写道: > > 使用hivecatalog连接 抱一下错误 > > > > > flink py4j.protocol.Py4JJavaError: An error occurred while calling > o10.registerCatalog. : java.lang.NullPointerException

Re: Flink on k8s

2020-09-03 文章 Yang Wang
需要你发一下TaskManager的log,这样才能方便排查问题 你可以使用kubectl logs来直接查看log 另外你是用的官方文档[1]里面的yaml来启动的吗,E2E测试(minikube版本是v1.8.2)都是正常运行的,我在真实的K8s集群测试也没问题 [1]. https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html#deploy-flink-cluster-on-kubernetes Best, Yang superainbower

Flink SQL 任务乱码问题

2020-09-03 文章 LakeShen
Hi 社区, 我的一个 Flink SQL 任务,版本为 1.10,代码中使用了中文值,具体如下: select xxx, case when a = 'a' then '你好' when a = 'b' then '你好呀' end as va from xxx ; 然后会把这个结果输出,最后发现 va 的值乱码了,也就是中文乱码。 目前有什么比较好的解决方法吗。 Best, LakeShen

????: ????????????????????????????????????????????

2020-09-03 文章 samuel....@ubtrobot.com
?? ?? ??1.11flink sql??,??streaming api kafka,eventtime,stream??table,sql,??kafka topic,flink webui watermarks No Watermark,,kafka

Re:无法从checkpoint中恢复state

2020-09-03 文章 程龙
再启动服务的时候 需要指定checkpoint回复地址,你这里只是指定了做checkpint地址 在 2020-09-03 16:03:41,"sun" <1392427...@qq.com> 写道: >你好,我有2个问题 > >1:每次重启服务,checkpoint的目录中chk- 总是从chk-1开始,chk-2 ,没有从上次的编号开始 > >2:重启服务后,没有从checkpoint中恢复state的数据 > >下面是我的配置,我是在本地调试的,单机 > > > >final StreamExecutionEnvironment

Re: Flink 1.10.1 cdh-hdfs启用HA时,flink job提交报错

2020-09-03 文章 Storm☀️
问题找到了; hdfs-site.xml配置文件冲突导致的。 原因:通过-yt上传了 外部集群的hdfs-site.xml文件。 flink10初始化taskmanager读取 hdfs-site.xml配置的时候被外部的hdfs-site.xml文件干扰。 此问题终结。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkKafkaConsumer问题

2020-09-03 文章 Shuiqiang Chen
Hi op, 在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。 另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id

Flink动态CEP该怎么做?

2020-09-03 文章 Jim Chen
Hi, 我们打算用flink来做规则匹配,现在打算用CEP来做。但是发现flink 不支持动态CEP,网上百度了下,类似于滴滴那种方式,改动太大。没有能力能做,所以,问下大家,有没有什么思路,简单点的

Re: 无法从checkpoint中恢复state

2020-09-03 文章 Congxian Qiu
Hi 从 retain checkpoint 恢复可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/checkpoints.html#%E4%BB%8E%E4%BF%9D%E7%95%99%E7%9A%84-checkpoint-%E4%B8%AD%E6%81%A2%E5%A4%8D%E7%8A%B6%E6%80%81 Best, Congxian sun <1392427...@qq.com> 于2020年9月3日周四 下午4:14写道: >

flink sql多层view嵌套,字段not found

2020-09-03 文章 Lin Hou
Hi, 请教一个通过sql-client提交flink sql遇到的关于嵌套view,当嵌套第二层时,查询时会报找不到字段的问题。 元数据已经建好,简述如下: 1.建嵌套的view: create temporary view temp_app_impression_5min as select argService as arg_service, timeLocal as time_local, mid as mid, vipruid as vipruid, activity as activity, LOWER(activityProperty)