Re: flink avro schema 升级变动,job如何平滑过渡

2023-03-09 文章 Peihui He
:103) ... 9 more 如上, 比如 之前的schemal 是 { a, b } 后来调整为 { a, b, c } 当程序升级后,由于kafka中同时包含新旧数据,就会报错了 Shammon FY 于2023年2月24日周五 18:56写道: > Hi > > 你可以贴一下错误看下具体原因 > > Best, > Shammon > > On Fri, Feb 24, 2023 at 6:10 PM Peihui He wrote: > > > Hi, all > > > > 请

flink avro schema 升级变动,job如何平滑过渡

2023-02-24 文章 Peihui He
Hi, all 请教大家有没有遇到这样的情况,flink 使用avro 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。 大家一般是怎么处理的呢 Best Wishes.

flink cep A B C 事件一段时间不分先后顺序匹配

2023-02-20 文章 Peihui He
hi, all 如题,看了https://mp.weixin.qq.com/s/PT8ImeOOheXR295gQRsN8w 这篇文章后,发现第四个问题没有讲到解决方案。 请教大家有什么好的方案没呢? Best Wishes!

flink 1.16 kafka 流和自定义流collect后,watermark 消失

2022-11-13 文章 Peihui He
Hi, 如题,代码大概如下: stream1 = env.fromSource(kafkaSource, wartermarkStrategy) stream2 = env.addSource(ConfigSource()) stream1.collect(stream2).process(ProcessFunction()).print() 这种情况下在collect时没有watermark, 是什么原因呢?

flink key by 逻辑疑问

2022-05-28 文章 Peihui He
Hi, all 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题: 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不? 如果这样的话,是不是没多久就会oom呢? 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。 Best Regards!

flink jdbc source oom

2022-03-30 文章 Peihui He
Hi, all 请教下大家,使用flink jdbc 读取tidb中数据时如何在查询的时候能否根据条件在数据库层面做一些过滤呢? 当数据量很大比如几千万上亿的话,flink jdbc source 就很无力了。 Best Regards!

Re: RocksDB 读 cpu 100% 如何调优

2022-03-18 文章 Peihui He
> > 退订 > > > > > > > > 在 Peihui He ,2022年3月18日 上午11:18写道: > > > > Hi, all > > > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 > > > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下:

Re: RocksDB 读 cpu 100% 如何调优

2022-03-18 文章 Peihui He
合预期的。 > > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。 > > > > deng xuezhao 于2022年3月18日周五 11:19写道: > > > > > 退订 > > > > > > > > > > > > 在 Peihui He ,2022年3月18日 上午11:18写道: > > > > > > Hi, all

RocksDB 读 cpu 100% 如何调优

2022-03-17 文章 Peihui He
Hi, all 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下: "process (6/18)#0" Id=80 RUNNABLE (in native) at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:2084) at

Re: flink sql job 提交流程问题

2021-08-14 文章 Peihui He
补充: 这个问题在ha的情况下非常突出,因为和hdfs的交互式线性的,当文件达到几百的时候,特别慢 Peihui He 于2021年8月15日周日 上午11:18写道: > Hi all: > > 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: > 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 > > 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制

flink sql job 提交流程问题

2021-08-14 文章 Peihui He
Hi all: 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。 通过本地测试,简单的调整了代码,示例如下: BlobServer: [image: image.png] ClientUtils [image:

sql-client 提交job 疑问

2021-07-26 文章 Peihui He
Hi,all 使用zeppelin提交sqljob的时候发现在 flink jobmanager 中首先会打印以下日志, 2021-07-26 15:54:36,929 [Thread-6575] INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient

flink 1.11.2 pyudf python worker 内存怎么限制呢?

2021-06-17 文章 Peihui He
Hi, all 使用python写的udf,里面封装了模型的预测,但是在提交sqljob到flink session的时候,总是被容器kill。 taskmanager 命令行参数: sun.java.command = org.apache.flink.runtime.taskexecutor.TaskManagerRunner -Djobmanager.rpc.address=10.50.56.253 --configDir /opt/flink-1.11.2/conf -D taskmanager.memory.framework.off-heap.size=134217728b

Re: 请教flink cep如何对无序数据处理

2021-05-14 文章 Peihui He
[image: image.png] 这样可以不? sherlock zw 于2021年5月14日周五 上午8:52写道: > 兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛? > 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件 >

Re: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB

2021-04-28 文章 Peihui He
刚觉像是rocksdb的内存不够用了,调大试试呢? a593700624 <593700...@qq.com> 于2021年4月28日周三 下午3:47写道: > org.apache.flink.util.FlinkRuntimeException: Error while retrieving data > from > RocksDB > at > > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) > at > >

Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置

2021-04-21 文章 Peihui He
fetch.min.bytes fetch.wait.max.ms 还可以用着两个参数控制下的 熊云昆 于2021年4月21日周三 下午7:10写道: > 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 > > > | | > 熊云昆 > | > | > 邮箱:xiongyun...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2021年04月20日 18:19,李一飞 写道: > flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置 > 最好分流、批场景回答一下,谢谢!

Re: flink1.12.2 StreamingFileSink 问题

2021-04-16 文章 Peihui He
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html 这个参考过没呢? 张锴 于2021年4月16日周五 下午1:24写道: > maven 仓库flink-connector-filesystem 最高1.11.3,也能用吗? > > guoyb <861277...@qq.com> 于2021年4月15日周四 下午10:01写道: > > > 1.12.0的也可以,大版本一样就行了 > > > > > > > > ---原始邮件--- >

Re: flinksql 1.12.1 row中字段访问报错

2021-03-12 文章 Peihui He
如果单独执行这个function 的话是没有问题的 select Test().a 是没有问题的 Peihui He 于2021年3月12日周五 下午6:30写道: > hi, all > > 定义一个 ScalarFunction > class Test extends ScalarFunction{ > @DataTypeHint("ROW") > def eval(): Row ={ > Row.of("a", "b", "c") &g

flinksql 1.12.1 row中字段访问报错

2021-03-12 文章 Peihui He
hi, all 定义一个 ScalarFunction class Test extends ScalarFunction{ @DataTypeHint("ROW") def eval(): Row ={ Row.of("a", "b", "c") } } 当执行下面语句的时候 select Test().a from taba1 会报下面的错误: java.io.IOException: Fail to run stream sql job at

Re: SQL作业的提交方式

2021-01-07 文章 Peihui He
可以尝试下zeppelin 0.9 http://zeppelin.apache.org/ jiangjiguang719 于2021年1月7日周四 下午8:34写道: > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > 1、有没有更好的SQL作业的提交方式? > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?

Re: flink 1.11.2 cep rocksdb 性能调优

2020-11-04 文章 Peihui He
= serializeValueNullSensitive(userValue, userValueSerializer); backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); } 通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。 Peihui He 于2020年11月5日周四 上午11:05写道: > hi > > 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为

flink 1.11.2 cep rocksdb 性能调优

2020-11-04 文章 Peihui He
hi 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。 private void bufferEvent(IN event, long currentTime) throws Exception { long currentTs = System.currentTimeMillis(); List elementsForTimestamp = elementQueueState.get(currentTime); if (elementsForTimestamp ==

Re: flink 1.11.2 keyby 更换partition

2020-11-02 文章 Peihui He
nk-docs-release-1.11/dev/stream/state/state.html#keyed-datastream > > Best, > Congxian > > > Peihui He 于2020年11月2日周一 下午2:56写道: > > > Hi, > > > > 不好意思,我这边误导。 > > 现在的情况是这样的 > > > > 用这个方法测试 > > KeyGroupRangeAssignment.assignKeyToParall

Re: flink 1.11.2 keyby 更换partition

2020-11-01 文章 Peihui He
de 的逻辑就行了 > Best, > Congxian > > > Peihui He 于2020年11月2日周一 上午10:01写道: > > > hi, > > > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。 > > > > Best Wishes. > > > > Zhang Yuxiao 于2020年10月31日周六 上午9:38写道: >

Re: flink 1.11.2 keyby 更换partition

2020-11-01 文章 Peihui He
hi, 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。 Best Wishes. Zhang Yuxiao 于2020年10月31日周六 上午9:38写道: > 你好, > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求? > > 发件人: Peihui He > 发送时间: 2020年10月30日 下午 07:23 > 收件人: use

flink 1.11.2 keyby 更换partition

2020-10-30 文章 Peihui He
hi,all 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。 KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id), 128, parallesism) 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。 Best Regards.

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-10-14 文章 Peihui He
ke(2).toArray, schema.getFieldNames.toList.take(2).toArray)).setParallelism(2)) Peihui He 于2020年10月14日周三 上午11:52写道: > hello, > > stenv.fromDataStream(stream, $"") > > 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > 类型,field

Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出

2020-10-13 文章 Peihui He
hello, stenv.fromDataStream(stream, $"") 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 类型,field应该如何设置呢? 比如: { a: 1, b: { c: "test" } } Best Wishes. shizk233 于2020年9月28日周一 下午7:15写道: > flink sql似乎不能设置rebalance,在Data Stream API可以设。 > >

Re: flink sql 1.11.2 jdbc connector 按月分表读取

2020-09-25 文章 Peihui He
是我这边建issue不? 这里还发现一个问题 select count(*) from mysql_table 不能执行。 Best wishes. Jark Wu 于2020年9月25日周五 上午10:37写道: > 我觉得是个挺好的需求,有点类似于 Kafka 的 multi topic 功能,可以先建个 issue 收集下大家的需求。 > > > Best, > Jark > > On Thu, 24 Sep 2020 at 17:26, Peihui He wrote: > > > Hi, all >

flink sql 1.11.2 jdbc connector 按月分表读取

2020-09-24 文章 Peihui He
Hi, all 测试发现flink sql jdbc mysql 的table-name 不能通过正则来读取多个表,这些表按月份划分的。 后续会支持不? Best Wishes.

Re: flink-sql 1.11版本都还没完全支持checkpoint吗

2020-09-08 文章 Peihui He
[image: image.png] 重新部署如果需要从上次cancel点恢复的化,需要指定savepoint,savepoint 可以是上次cancel点最后一次checkpoint。 凌天荣 <466792...@qq.com> 于2020年9月8日周二 下午4:07写道: > 没有指定savapoint的,我们是cancel掉,然后重新部署的 > > > --原始邮件-- > 发件人: > "user-zh" >

Re: flink-sql 1.11版本都还没完全支持checkpoint吗

2020-09-08 文章 Peihui He
重启后有没有指定savepoint呢? 凌天荣 <466792...@qq.com> 于2020年9月8日周二 下午3:50写道: > 代码里设置了enableCheckpointing,任务停掉后,重启,还是没能消费停掉期间的数据,也就是checkpoint没生效

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 文章 Peihui He
Hi, 就是用hdfs的。 Jingsong Li 于2020年9月7日周一 上午11:16写道: > 另外,可能和使用本地文件系统有关?换成HDFS试试? > > On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li > wrote: > > > Hi, > > > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He wrote: &

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-07 文章 Peihui He
于2020年9月7日周一 上午11:15写道: > Hi, > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He wrote: > > > Hi, all > > > > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-05 文章 Peihui He
Hi, all 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 请问有什么好的解决方式没呢? Best Wishes. Peihui He 于2020年9月4日周五 下午6:25写道: > Hi, all > > 当指定partition的时候这个问题通过path 也没法解决了 > > CREATE TABLE MyUserTable ( >

Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
to a directory 'format' = 'json', -- required: file system connector) select * from MyUserTable limit 10; job 会一直卡在一个地方 [image: image.png] 这种改怎么解决呢? Peihui He 于2020年9月4日周五 下午6:02写道: > hi, all > 我这边用flink sql client 创建表的时候 > > CREATE TABLE MyUserTable ( > c

flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
hi, all 我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a

Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
[image: image.png] order by TUMBLE_START 结果如上图 Peihui He 于2020年8月12日周三 下午3:40写道: > Hi BenChao > > SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。 > order by TUMBLE_START desc 好像不是预期的结果 > > 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么? > > Best Wishes. > > Benchao Li 于2020年8

Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
Hi BenChao 发现问题了,是因为select 的字段中包含了array,导致数据显示的比实际limit数据要多 Best Wishes. Benchao Li 于2020年8月12日周三 下午3:12写道: > 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢? > > Peihui He 于2020年8月12日周三 下午3:03写道: > > > Hi all, > > > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limi

Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
应该是我这边sql问题,我这边在看看,打扰大家了 Peihui He 于2020年8月12日周三 下午3:03写道: > Hi all, > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > sql 类似下面: > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) > ON TRUE order by t desc limit 10 > > 如果select 结果中

flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
Hi all, 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 sql 类似下面: select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON TRUE order by t desc limit 10 如果select 结果中不包括c的化,就正常了 请问这个是什么问题呢?sql是写的不对么? Best Wishes.

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-22 文章 Peihui He
Hi Congxian, 这个问题有结论没呢? Best wishes. Peihui He 于2020年7月17日周五 下午4:21写道: > Hi Congxian, > > [image: Snipaste_2020-07-17_16-20-06.png] > > 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。 > > Best wishes. > > Congxian Qiu 于2020年7月17日周五 下午1:31写道: > >> Hi Peihui >>

flink sink kafka Error while confirming checkpoint

2020-07-22 文章 Peihui He
Hello, flink 1.10.1 kafka 2.12-1.1.0 运行一段时间后会出现一下错误,不知道有遇到过没? java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:935) at

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-17 文章 Peihui He
Hi Congxian, [image: Snipaste_2020-07-17_16-20-06.png] 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。 Best wishes. Congxian Qiu 于2020年7月17日周五 下午1:31写道: > Hi Peihui > > 感谢你的回复,我这边没有看到附件,你那边能否确认下呢? > > Best, > Congxian > > > Peihui He 于2020年7月17日周五 上午10:13写道: > > > H

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
Hi Congxian 见附件。 Best wishes. Congxian Qiu 于2020年7月16日周四 下午8:24写道: > Hi Peihui > > 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug > 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可, > 非常感谢~ > > [1] https://gist.github.com/ > > Best, > Congxian

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
Hi Yun, 我这边测试需要在集群上跑的,本地idea跑是没有问题的。 flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。 Peihui He 于2020年7月16日周四 下午5:26写道: > Hi Yun, > > 作业没有开启local recovery, 我这边测试1.10.0是必现的。 > > Best wishes. > >

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
; https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a > 祝好 > 唐云 > ________ > From: Peihui He > Sent: Thursday, July 16, 2020 16:15 > To: user-zh@flink.apache.org > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 >

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
tream/state/schema_evolution.html > > 祝好 > 唐云 > > > > From: Robin Zhang > Sent: Wednesday, July 15, 2020 16:23 > To: user-zh@flink.apache.org > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃

Re: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
tor-td2232.html#a2239 > 解决方式: > 1. 使用hdfs作为状态后端不会报错 > 2. 升级至1.10.1使用rocksdb也不会出现该问题 > > > > > > > > > > > > > > > 在 2020-07-14 14:41:53,"Peihui He" 写道: > >Hi Yun, > > > >我这边用一个word count 例子,socket -> flatmap ->

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Peihui He
er log 呢? > > Best, > Congxian > > > Peihui He 于2020年7月14日周二 下午2:46写道: > > > Hi Congxian, > > > > 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word > > 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10 > > 都不能从上次的checkpoint状态中恢复。不知道

Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
Hi BenChao, 换成1.10.1 就可以了。刚才那封邮件不行,是因为依赖flink-kafka的依赖版本没有修改过来。 Thank you. Benchao Li 于2020年7月15日周三 上午10:25写道: > Hi Peihui, > > 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。 > > [1] https://issues.apache.org/jira/browse/FLINK-16220 > > Peihui He 于2020年7月15日周三

Re: flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
ug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。 > > [1] https://issues.apache.org/jira/browse/FLINK-16220 > > Peihui He 于2020年7月15日周三 上午9:54写道: > > > Hello, > > > > 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如 > > create table xxx ( > > a string, >

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-14 文章 Peihui He
chema中声明, > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > &g

flink sql 1.10 insert into tb select 复杂schema 失败

2020-07-14 文章 Peihui He
Hello, 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如 create table xxx ( a string, b row( c row(d string) ) ) 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误 Caused by: java.lang.ClassCastException:

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-14 文章 Peihui He
ause。 > > > > [1] > > > https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > > > > > > 祝

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-14 文章 Peihui He
k/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > > > 祝好 > 唐云 > ____ > From: Peihui He > Sent: Tuesday, July 14, 2020 10:42 > To: user-zh@flink.apache.org > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > hello, >

flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
hello, 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 Caused by: java.nio.file.NoSuchFileException:

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-10 文章 Peihui He
eld,执行时又能解析四个field。 > > 一种做法是定义复杂的jsonObject对应的ROW > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > 然后query里用UDTF处理。 > > > 祝好 > Leonard Xu > > > > > > 在 2020年7月10日,10:16

flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Peihui He
Hello: 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 Best wishes.

Re: flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Peihui He
好的,感谢珞 Leonard Xu 于2020年7月3日周五 下午4:07写道: > Hello > > 我了解到社区有人在做了,1.12 应该会支持 > > 祝好 > Leonard Xu > > > 在 2020年7月3日,16:02,Peihui He 写道: > > > > hello > > > > 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? > > > > best wishes > >

flink 1.10 kafka collector topic 配置pattern

2020-07-03 文章 Peihui He
hello 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? best wishes

Re: flink 如何自定义connector

2020-05-28 文章 Peihui He
hello 正在尝试中,感谢解答珞 best wishes 111 于2020年5月28日周四 上午10:16写道: > Hi, > 想要在sqlgateway里面使用,那么可以看看下面几个条件: > 1 满足SPI的要求,能让flink自动发现实现类 > 2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下 > 3 如果与Hive集成,使用hivecatalog,那么先要注册表 > 这样就可以使用了。 > Best, > Xinghalo

Re: flink 如何自定义connector

2020-05-27 文章 Peihui He
nk-docs-master/dev/table/sourceSinks.html > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html > > > > > > 在 2020年5月28日,09:16,Peihui He 写道: > > > > hello > > > >请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql > > gateway,使得可以执行sql的操作呢? > > > > > > best wish > >

flink 如何自定义connector

2020-05-27 文章 Peihui He
hello 请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql gateway,使得可以执行sql的操作呢? best wish

Re: flink cep 匹配一段时间类A,B,C事件发生

2020-04-16 文章 Peihui He
是的,这个想法好,谢谢 Dian Fu 于2020年4月16日周四 上午9:59写道: > 类似于这样? > > AA follow by BB follow by CC > > AA定义成A or B or C > BB定义成(A or B or C)and BB.type != AA.type > CC定义成(A or B or C)and CC.type != AA.type and CC.type != BB.type > > > 在 2020年4月16日,上午8:40,Peihui He 写道: > &g

flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-03-23 文章 Peihui He
大家好,我在用flink 1.9.2 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了 这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。 不知道这是什么原因呢?

flink influxdb 有些表中不包含jobname

2020-03-22 文章 Peihui He
hello, 我这边用的是influxdb 作为flink 1.9.2的reporter, 但是在一些表里面没有jobname信息。这样会是的在每次重启的时候都得修改grafana的图标信息,很麻烦。 请问有什么好的儿方式解决没呢?