Re: flink avro schema 升级变动,job如何平滑过渡
java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: java.io.EOFException at org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw(BinaryDecoder.java:851) at org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:373) at org.apache.flink.avro.shaded.org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:290) at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ... 9 more 如上, 比如 之前的schemal 是 { a, b } 后来调整为 { a, b, c } 当程序升级后,由于kafka中同时包含新旧数据,就会报错了 Shammon FY 于2023年2月24日周五 18:56写道: > Hi > > 你可以贴一下错误看下具体原因 > > Best, > Shammon > > On Fri, Feb 24, 2023 at 6:10 PM Peihui He wrote: > > > Hi, all > > > > 请教大家有没有遇到这样的情况,flink 使用avro > > 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。 > > > > 大家一般是怎么处理的呢 > > > > Best Wishes. > > >
flink avro schema 升级变动,job如何平滑过渡
Hi, all 请教大家有没有遇到这样的情况,flink 使用avro 消费kafka中数据,后来在schema结构中加入新的字段。在灰度过程中会混杂着新老数据,这样的flink 消费程序就会挂掉。 大家一般是怎么处理的呢 Best Wishes.
flink cep A B C 事件一段时间不分先后顺序匹配
hi, all 如题,看了https://mp.weixin.qq.com/s/PT8ImeOOheXR295gQRsN8w 这篇文章后,发现第四个问题没有讲到解决方案。 请教大家有什么好的方案没呢? Best Wishes!
flink 1.16 kafka 流和自定义流collect后,watermark 消失
Hi, 如题,代码大概如下: stream1 = env.fromSource(kafkaSource, wartermarkStrategy) stream2 = env.addSource(ConfigSource()) stream1.collect(stream2).process(ProcessFunction()).print() 这种情况下在collect时没有watermark, 是什么原因呢?
flink key by 逻辑疑问
Hi, all 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题: 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不? 如果这样的话,是不是没多久就会oom呢? 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。 Best Regards!
flink jdbc source oom
Hi, all 请教下大家,使用flink jdbc 读取tidb中数据时如何在查询的时候能否根据条件在数据库层面做一些过滤呢? 当数据量很大比如几千万上亿的话,flink jdbc source 就很无力了。 Best Regards!
Re: RocksDB 读 cpu 100% 如何调优
OK,我这边加个metric,先观察下 yue ma 于2022年3月18日周五 12:23写道: > hi > 我觉得这里可以注意两地方 > 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu 100% > 是符合预期的。 > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。 > > deng xuezhao 于2022年3月18日周五 11:19写道: > > > 退订 > > > > > > > > 在 Peihui He ,2022年3月18日 上午11:18写道: > > > > Hi, all > > > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 > > > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下: > > > > "process (6/18)#0" Id=80 RUNNABLE (in native) > > at org.rocksdb.RocksDB.get(Native Method) > > at org.rocksdb.RocksDB.get(RocksDB.java:2084) > > at > > > > > org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173) > > at > > > > > org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) > > at > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156) > > at > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145) > > at > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > > at > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > > at > > org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > at > > org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > at > > org.apache.flink.streaming.runtime.io > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown > > Source) > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > > at > > > org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown > > Source) > > at > > > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > > at java.lang.Thread.run(Thread.java:748) > > > > 但是看checkpoint数据,才100m左右 > > > > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢? > > >
Re: RocksDB 读 cpu 100% 如何调优
状态还是比较大的,应该有几个g的 Jiangang Liu 于2022年3月18日周五 14:36写道: > 如果状态比较小,可以直接考虑使用filesystem,这种perRecord的操作还是比较耗时的。 > > yue ma 于2022年3月18日周五 12:23写道: > > > hi > > 我觉得这里可以注意两地方 > > 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu > 100% > > 是符合预期的。 > > 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。 > > > > deng xuezhao 于2022年3月18日周五 11:19写道: > > > > > 退订 > > > > > > > > > > > > 在 Peihui He ,2022年3月18日 上午11:18写道: > > > > > > Hi, all > > > > > > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: > > > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 > > > > > > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下: > > > > > > "process (6/18)#0" Id=80 RUNNABLE (in native) > > > at org.rocksdb.RocksDB.get(Native Method) > > > at org.rocksdb.RocksDB.get(RocksDB.java:2084) > > > at > > > > > > > > > org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173) > > > at > > > > > > > > > org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) > > > at > > > > > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156) > > > at > > > > > > > > > com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145) > > > at > > > > > > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) > > > at > > > org.apache.flink.streaming.runtime.io > > > > > > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > > at > > > org.apache.flink.streaming.runtime.io > > > > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > > at > > > org.apache.flink.streaming.runtime.io > > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown > > > Source) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > > > at > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > > > at > > > > > > org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown > > > Source) > > > at > > > > > > > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > > > at > > > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > > > at java.lang.Thread.run(Thread.java:748) > > > > > > 但是看checkpoint数据,才100m左右 > > > > > > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢? > > > > > >
RocksDB 读 cpu 100% 如何调优
Hi, all 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是: 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下: "process (6/18)#0" Id=80 RUNNABLE (in native) at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:2084) at org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173) at org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) at com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156) at com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown Source) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) 但是看checkpoint数据,才100m左右 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢?
Re: flink sql job 提交流程问题
补充: 这个问题在ha的情况下非常突出,因为和hdfs的交互式线性的,当文件达到几百的时候,特别慢 Peihui He 于2021年8月15日周日 上午11:18写道: > Hi all: > > 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: > 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 > > 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob > server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。 > > 通过本地测试,简单的调整了代码,示例如下: > BlobServer: > [image: image.png] > > ClientUtils > [image: image.png] > 调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。 > > 现有如下疑问: > 1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。 > 2、blob server 中moveTempFileToStore 的写锁是否真的必要呢? > > Best wishes. >
flink sql job 提交流程问题
Hi all: 在使用zeppelin提交sql的过程中总是发现超时现象,通过定位发现有如下问题: 1、blob client 和blob server 通信时采用单客户端通行,当有比较多的文件时,比如100个,这个耗时挺大的 2、blob server 虽然有blob.fetch.num-concurrent 进行并发控制,但是blob server在moveTempFileToStore方法中采用了写锁的方式,当时并发失效。 通过本地测试,简单的调整了代码,示例如下: BlobServer: [image: image.png] ClientUtils [image: image.png] 调整后通过zeppelin 提交job后,时间由之前的几分钟到现在几十秒,并且不会随着依赖的jar的增加而线性增长。 现有如下疑问: 1、blob server 中的锁的粒度是不是过大?当并行提交多个sql,实际上也只能一个一个的执行。 2、blob server 中moveTempFileToStore 的写锁是否真的必要呢? Best wishes.
sql-client 提交job 疑问
Hi,all 使用zeppelin提交sqljob的时候发现在 flink jobmanager 中首先会打印以下日志, 2021-07-26 15:54:36,929 [Thread-6575] INFO org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient.checkTrustAndSend(SaslDataTransferClient.java:239) [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false 想请问这个是干什么用的呢?有时候会打印一分钟左右,然后才会出现一下日志 2021-07-26 15:54:37,526 [flink-akka.actor.default-dispatcher-1803] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:300) [] - Received JobGraph submission 0d329a174ee4d33600fe75b635631bee . 看源码这种也确实太耗时了 [image: image.png] 有什么办法避免没呢?
flink 1.11.2 pyudf python worker 内存怎么限制呢?
Hi, all 使用python写的udf,里面封装了模型的预测,但是在提交sqljob到flink session的时候,总是被容器kill。 taskmanager 命令行参数: sun.java.command = org.apache.flink.runtime.taskexecutor.TaskManagerRunner -Djobmanager.rpc.address=10.50.56.253 --configDir /opt/flink-1.11.2/conf -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=321011060b -D taskmanager.memory.network.min=321011060b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=321011060b -D taskmanager.cpu.cores=4.0 -D taskmanager.memory.task.heap.size=2299652985b -D taskmanager.memory.task.off-heap.size=0b [image: image.png] python.fn-execution.framework.memory.size 450mb 配置了如上参数,但是没有效果。 是什么原因呢?
Re: 请教flink cep如何对无序数据处理
[image: image.png] 这样可以不? sherlock zw 于2021年5月14日周五 上午8:52写道: > 兄弟们,想问下Flink的CEP能够对无序数据流进行处理匹配嘛? > 我这里指的无序事件是:例如有两个事件,事件A和事件B,在一个时间窗口内,只要匹配到了A和B,不论A和B的到来顺序,我都认为是符合我的条件 >
Re: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB
刚觉像是rocksdb的内存不够用了,调大试试呢? a593700624 <593700...@qq.com> 于2021年4月28日周三 下午3:47写道: > org.apache.flink.util.FlinkRuntimeException: Error while retrieving data > from > RocksDB > at > > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) > at > > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) > at > > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:455) > at > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) > at > > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:154) > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:568) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167) > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179) > at > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101) > at > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180) > at > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > at > org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM > limit > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) > ... 20 more > > > 能跑几个小时,总会因为这个问题,一直陷入重启 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置
fetch.min.bytes fetch.wait.max.ms 还可以用着两个参数控制下的 熊云昆 于2021年4月21日周三 下午7:10写道: > 有个这个参数max.poll.records,表示一次最多获取多少条record数据,默认是500条 > > > | | > 熊云昆 > | > | > 邮箱:xiongyun...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2021年04月20日 18:19,李一飞 写道: > flink流、批场景下kafka拉取速率问题:每批次拉取多少条?是动态吗还是可以设置 > 最好分流、批场景回答一下,谢谢!
Re: flink1.12.2 StreamingFileSink 问题
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html 这个参考过没呢? 张锴 于2021年4月16日周五 下午1:24写道: > maven 仓库flink-connector-filesystem 最高1.11.3,也能用吗? > > guoyb <861277...@qq.com> 于2021年4月15日周四 下午10:01写道: > > > 1.12.0的也可以,大版本一样就行了 > > > > > > > > ---原始邮件--- > > 发件人: "张锴" > 发送时间: 2021年4月15日(周四) 下午5:16 > > 收件人: "user-zh" > 主题: flink1.12.2 StreamingFileSink 问题 > > > > > > > > > flink用的1.12.2,要sink到hdfs,选用了StreamingFileSink,导入依赖的时候maven仓库并没有1.12.2的flink-connector-filesystem的jar包,我应该选用哪个版本合适 >
Re: flinksql 1.12.1 row中字段访问报错
如果单独执行这个function 的话是没有问题的 select Test().a 是没有问题的 Peihui He 于2021年3月12日周五 下午6:30写道: > hi, all > > 定义一个 ScalarFunction > class Test extends ScalarFunction{ > @DataTypeHint("ROW") > def eval(): Row ={ > Row.of("a", "b", "c") > } > } > > 当执行下面语句的时候 > select Test().a from taba1 > 会报下面的错误: > > java.io.IOException: Fail to run stream sql job at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) > at > org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) > at > org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) > at > org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111) > at > org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) > at > org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) > at > org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) > at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at > org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) > at > org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError: > Test() (of class org.apache.calcite.rex.RexCall) at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) > at > org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) > at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) > at > org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) > at > org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) > at > org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) > at > org.apache.calcite.plan.volcano.VolcanoPlann
flinksql 1.12.1 row中字段访问报错
hi, all 定义一个 ScalarFunction class Test extends ScalarFunction{ @DataTypeHint("ROW") def eval(): Row ={ Row.of("a", "b", "c") } } 当执行下面语句的时候 select Test().a from taba1 会报下面的错误: java.io.IOException: Fail to run stream sql job at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172) at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105) at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:494) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:257) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151) at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:111) at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132) at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: scala.MatchError: Test() (of class org.apache.calcite.rex.RexCall) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283) at org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269) at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111) at org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155) at org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284) at org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411) at org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84) at org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268) at org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132) at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604) at org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at
Re: SQL作业的提交方式
可以尝试下zeppelin 0.9 http://zeppelin.apache.org/ jiangjiguang719 于2021年1月7日周四 下午8:34写道: > 目前我司的SQL作业的提交 还是使用的 Jark 老师的 flink-sql-submit 项目,想问下: > 1、有没有更好的SQL作业的提交方式? > 2、既然flink1.12 已经实现批流一体,要实现即席查询怎么提交SQL呢? > 3、SQL Client Gateway 社区大概啥时候发布?能够生产可用?
Re: flink 1.11.2 cep rocksdb 性能调优
hi, @Override public UV get(UK userKey) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer); byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); return (rawValueBytes == null ? null : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer)); } @Override public void put(UK userKey, UV userValue) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer); byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer); backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); } 通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。 Peihui He 于2020年11月5日周四 上午11:05写道: > hi > > 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。 > > private void bufferEvent(IN event, long currentTime) throws Exception { > long currentTs = System.currentTimeMillis(); > List elementsForTimestamp = elementQueueState.get(currentTime); > if (elementsForTimestamp == null) { > this.bufferEventGetNullhistogram.update(System.currentTimeMillis() - > currentTs); > elementsForTimestamp = new ArrayList<>(); > }else { > > this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs); > } > elementsForTimestamp.add(event); > long secondCurrentTs = System.currentTimeMillis(); > elementQueueState.put(currentTime, elementsForTimestamp); > this.bufferEventPuthistogram.update(System.currentTimeMillis() - > secondCurrentTs); > this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs); > } > > 通过复写CepOperator,加入了一些metics发现 > > this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new > DescriptiveStatisticsHistogram(1000)); > this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new > DescriptiveStatisticsHistogram(1000)); > this.bufferEventGetNullhistogram = > metrics.histogram("buffer_event_get_null_delay", new > DescriptiveStatisticsHistogram(1000)); > this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new > DescriptiveStatisticsHistogram(1000)); > > 在get和put比较耗时,整个bufferEvent 能达到200ms > 从rocksdb的metric来看没有进行太多flush和compaction。 > > [image: image.png] > [image: image.png] > > 也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。 > 也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html > ,但是我这sst文件很小。 > 请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。 > > Best Wishes. > > > > Best Wishes. > >
flink 1.11.2 cep rocksdb 性能调优
hi 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。 private void bufferEvent(IN event, long currentTime) throws Exception { long currentTs = System.currentTimeMillis(); List elementsForTimestamp = elementQueueState.get(currentTime); if (elementsForTimestamp == null) { this.bufferEventGetNullhistogram.update(System.currentTimeMillis() - currentTs); elementsForTimestamp = new ArrayList<>(); }else { this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs); } elementsForTimestamp.add(event); long secondCurrentTs = System.currentTimeMillis(); elementQueueState.put(currentTime, elementsForTimestamp); this.bufferEventPuthistogram.update(System.currentTimeMillis() - secondCurrentTs); this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs); } 通过复写CepOperator,加入了一些metics发现 this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventGetNullhistogram = metrics.histogram("buffer_event_get_null_delay", new DescriptiveStatisticsHistogram(1000)); this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new DescriptiveStatisticsHistogram(1000)); 在get和put比较耗时,整个bufferEvent 能达到200ms 从rocksdb的metric来看没有进行太多flush和compaction。 [image: image.png] [image: image.png] 也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。 也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html ,但是我这sst文件很小。 请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。 Best Wishes. Best Wishes.
Re: flink 1.11.2 keyby 更换partition
Hi, 我理解是不能的。 假设现在 asset_id 有如下数据 0 1 2 3 4 5 6 7 8 9 假设我通过自定义KeySelector,设定key 为Integer,调用KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id, 10, 10) 这个方法后,并不是我希望的0在0号partition,1在1号partition.结果如下(partition,count): (1,2) (3,1) (4,3) (5,3) (8,1) 因为 public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; } MathUtils.murmurHash 会对integer 做二次计算的。 Best Wishes. Congxian Qiu 于2020年11月2日周一 下午3:19写道: > Hi > 自定义的 KeySelector[1] 能否满足呢? > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#keyed-datastream > > Best, > Congxian > > > Peihui He 于2020年11月2日周一 下午2:56写道: > > > Hi, > > > > 不好意思,我这边误导。 > > 现在的情况是这样的 > > > > 用这个方法测试 > > KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id, > > KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ), > > parallelism) > > 发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了 > > ),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的 > > KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism > > ) 这种方式好多了。 > > > > > > 请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。 > > > > > > Best Wishes. > > > > > > > > Congxian Qiu 于2020年11月2日周一 下午1:52写道: > > > > > Hi > > > 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby > 会更均匀的话,是不是直接把计算 > > > md5 的逻辑改成计算 hashcode 的逻辑就行了 > > > Best, > > > Congxian > > > > > > > > > Peihui He 于2020年11月2日周一 上午10:01写道: > > > > > > > hi, > > > > > > > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。 > > > > > > > > Best Wishes. > > > > > > > > Zhang Yuxiao 于2020年10月31日周六 上午9:38写道: > > > > > > > > > 你好, > > > > > > > > > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求? > > > > > > > > > > 发件人: Peihui He > > > > > 发送时间: 2020年10月30日 下午 07:23 > > > > > 收件人: user-zh@flink.apache.org > > > > > 主题: flink 1.11.2 keyby 更换partition > > > > > > > > > > hi,all > > > > > > > > > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。 > > > > > > > > > > > > > > > > > > > > KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id), > > > > > 128, parallesism) > > > > > > > > > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。 > > > > > > > > > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。 > > > > > > > > > > > > > > > Best Regards. > > > > > > > > > > > > > > >
Re: flink 1.11.2 keyby 更换partition
Hi, 不好意思,我这边误导。 现在的情况是这样的 用这个方法测试 KeyGroupRangeAssignment.assignKeyToParallelOperator(asset_id, KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ), parallelism) 发现不是很均匀( job暂时并发度是103,这样的话默认maxParallelism 就256 了 ),下游key的数量能够相差100多。后来通过设置 maxParallelism和parallelism相等比按默认的 KeyGroupRangeAssignment .computeDefaultMaxParallelism( parallelism ) 这种方式好多了。 请问下后续会扩展keyby接口不?keyby 可以根据自定义partition,然后返回keyedstream。 Best Wishes. Congxian Qiu 于2020年11月2日周一 下午1:52写道: > Hi > 不太明白你这里问题,如果是说按照 Md5 进行 keyby 不均匀,直接用 hashcode keyby 会更均匀的话,是不是直接把计算 > md5 的逻辑改成计算 hashcode 的逻辑就行了 > Best, > Congxian > > > Peihui He 于2020年11月2日周一 上午10:01写道: > > > hi, > > > > 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。 > > > > Best Wishes. > > > > Zhang Yuxiao 于2020年10月31日周六 上午9:38写道: > > > > > 你好, > > > > > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求? > > > > > > 发件人: Peihui He > > > 发送时间: 2020年10月30日 下午 07:23 > > > 收件人: user-zh@flink.apache.org > > > 主题: flink 1.11.2 keyby 更换partition > > > > > > hi,all > > > > > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。 > > > > > > > > > KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id), > > > 128, parallesism) > > > > > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。 > > > > > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。 > > > > > > > > > Best Regards. > > > > > >
Re: flink 1.11.2 keyby 更换partition
hi, 已经尝试过了,通过partitionCustom 返回的是datastream,如果后面接cep的化,并发度就是1了,性能更不上。 Best Wishes. Zhang Yuxiao 于2020年10月31日周六 上午9:38写道: > 你好, > > 你看看 DataStream 类中的 partitionCustom 方法是否能够符合你的需求? > > 发件人: Peihui He > 发送时间: 2020年10月30日 下午 07:23 > 收件人: user-zh@flink.apache.org > 主题: flink 1.11.2 keyby 更换partition > > hi,all > > 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。 > > KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id), > 128, parallesism) > > 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。 > > 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。 > > > Best Regards. >
flink 1.11.2 keyby 更换partition
hi,all 请问可以更改keyby的hash partition 不? 现在发现这个hash partition不能均匀的分配key。 KeyGroupRangeAssignment.assignKeyToParallelOperator(stringToMd5(asset_id), 128, parallesism) 用这个方法测试,即使个asset_id 转换为MD5都不能均匀分配。 相反,用最最简单的 Math.abs(asset_id.hashcode() % parallesism ) 就可以平均的分配key。 Best Regards.
Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
试了下一种解决方案,如下,可以调整sql并行度。 val table1: Table = stenv.sqlQuery("select * from test") val schema = table1.getSchema val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item => Row.of(item.getField(0), item.getField(1)))(new RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray, schema.getFieldNames.toList.take(2).toArray)).setParallelism(2)) Peihui He 于2020年10月14日周三 上午11:52写道: > hello, > > stenv.fromDataStream(stream, $"") > > 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > 类型,field应该如何设置呢? > 比如: > { > a: 1, > b: { > c: "test" > } > } > > Best Wishes. > > shizk233 于2020年9月28日周一 下午7:15写道: > >> flink sql似乎不能设置rebalance,在Data Stream API可以设。 >> >> 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 >> >> 另一种思路就是kafka topic增加一下分区 >> >> Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道: >> >> > 你好! 使用flink >> > SQL,如何设置rebalance呢?--原始邮件-- >> > 发件人:zilongnbsp;xiao> > 发送时间:2020年9月27日(星期天) 晚上6:27 >> > 收件人:user-zh> > 主题:Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 >> >
Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出
hello, stenv.fromDataStream(stream, $"") 请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode 类型,field应该如何设置呢? 比如: { a: 1, b: { c: "test" } } Best Wishes. shizk233 于2020年9月28日周一 下午7:15写道: > flink sql似乎不能设置rebalance,在Data Stream API可以设。 > > 一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。 > > 另一种思路就是kafka topic增加一下分区 > > Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道: > > > 你好! 使用flink > > SQL,如何设置rebalance呢?--原始邮件-- > > 发件人:zilongnbsp;xiao > 发送时间:2020年9月27日(星期天) 晚上6:27 > > 收件人:user-zh > 主题:Re: flink 1.11.2 Table sql聚合后设置并行度为2及以上,数据无法输出 >
Re: flink sql 1.11.2 jdbc connector 按月分表读取
是我这边建issue不? 这里还发现一个问题 select count(*) from mysql_table 不能执行。 Best wishes. Jark Wu 于2020年9月25日周五 上午10:37写道: > 我觉得是个挺好的需求,有点类似于 Kafka 的 multi topic 功能,可以先建个 issue 收集下大家的需求。 > > > Best, > Jark > > On Thu, 24 Sep 2020 at 17:26, Peihui He wrote: > > > Hi, all > > > > 测试发现flink sql jdbc mysql 的table-name 不能通过正则来读取多个表,这些表按月份划分的。 > > 后续会支持不? > > > > Best Wishes. > > >
flink sql 1.11.2 jdbc connector 按月分表读取
Hi, all 测试发现flink sql jdbc mysql 的table-name 不能通过正则来读取多个表,这些表按月份划分的。 后续会支持不? Best Wishes.
Re: flink-sql 1.11版本都还没完全支持checkpoint吗
[image: image.png] 重新部署如果需要从上次cancel点恢复的化,需要指定savepoint,savepoint 可以是上次cancel点最后一次checkpoint。 凌天荣 <466792...@qq.com> 于2020年9月8日周二 下午4:07写道: > 没有指定savapoint的,我们是cancel掉,然后重新部署的 > > > --原始邮件-- > 发件人: > "user-zh" > < > peihu...@gmail.com; > 发送时间:2020年9月8日(星期二) 下午3:52 > 收件人:"user-zh" > 主题:Re: flink-sql 1.11版本都还没完全支持checkpoint吗 > > > > 重启后有没有指定savepoint呢? > > 凌天荣 <466792...@qq.com 于2020年9月8日周二 下午3:50写道: > > 代码里设置了enableCheckpointing,任务停掉后,重启,还是没能消费停掉期间的数据,也就是checkpoint没生效
Re: flink-sql 1.11版本都还没完全支持checkpoint吗
重启后有没有指定savepoint呢? 凌天荣 <466792...@qq.com> 于2020年9月8日周二 下午3:50写道: > 代码里设置了enableCheckpointing,任务停掉后,重启,还是没能消费停掉期间的数据,也就是checkpoint没生效
Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow
Hi, 就是用hdfs的。 Jingsong Li 于2020年9月7日周一 上午11:16写道: > 另外,可能和使用本地文件系统有关?换成HDFS试试? > > On Mon, Sep 7, 2020 at 11:15 AM Jingsong Li > wrote: > > > Hi, > > > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He wrote: > > > >> Hi, all > >> > >> 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > >> 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 > >> > >> 请问有什么好的解决方式没呢? > >> > >> Best Wishes. > >> > >> Peihui He 于2020年9月4日周五 下午6:25写道: > >> > >>> Hi, all > >>> > >>> 当指定partition的时候这个问题通过path 也没法解决了 > >>> > >>> CREATE TABLE MyUserTable ( > >>> column_name1 INT, > >>> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( > >>> 'connector' = 'filesystem', -- required: specify the > connector > >>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>> 'format' = 'json', -- required: file system > connector) > >>> > >>> > >>> select * from MyUserTable limit 10; > >>> > >>> job 会一直卡在一个地方 > >>> [image: image.png] > >>> > >>> 这种改怎么解决呢? > >>> > >>> Peihui He 于2020年9月4日周五 下午6:02写道: > >>> > >>>> hi, all > >>>> 我这边用flink sql client 创建表的时候 > >>>> > >>>> CREATE TABLE MyUserTable ( > >>>> column_name1 INT, > >>>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > >>>> 'connector' = 'filesystem', -- required: specify the > connector > >>>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>>> 'format' = 'json', -- required: file system > connector) > >>>> > >>>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > >>>> sql client 提交job会很慢,最后会报错 > >>>> > >>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: > >>>> [Internal server error., >>>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job > has > >>>> already been submitted. at > >>>> > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > >>>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > >>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>> at java.lang.reflect.Method.invoke(Method.java:498) at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > >>>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > >>>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > >>>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > >>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > >>>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at > >>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > >>>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at > >>>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >>>> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>>> at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > >>>> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>> End of exception on server side>] at > >>>> > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > >>>> at > >>>> > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > >>>> at > >>>> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >>>> > >>>> > >>>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > >>>> > >>>> 这种情况不知道有没有遇到过? > >>>> > >>>> Best Wishes. > >>>> > >>>> > >>>> > >>> > > > > -- > > Best, Jingsong Lee > > > > > -- > Best, Jingsong Lee >
Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow
ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Jingsong Li 于2020年9月7日周一 上午11:15写道: > Hi, > > 可以在JobMaster里面看一下jstack吗?看下具体卡在哪里? > > On Sat, Sep 5, 2020 at 11:11 PM Peihui He wrote: > > > Hi, all > > > > 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 > > 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 > > > > 请问有什么好的解决方式没呢? > > > > Best Wishes. > > > > Peihui He 于2020年9月4日周五 下午6:25写道: > > > >> Hi, all > >> > >> 当指定partition的时候这个问题通过path 也没法解决了 > >> > >> CREATE TABLE MyUserTable ( > >> column_name1 INT, > >> column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( > >> 'connector' = 'filesystem', -- required: specify the > connector > >> 'path' = 'file:///path/to/whatever', -- required: path to a directory > >> 'format' = 'json', -- required: file system > connector) > >> > >> > >> select * from MyUserTable limit 10; > >> > >> job 会一直卡在一个地方 > >> [image: image.png] > >> > >> 这种改怎么解决呢? > >> > >> Peihui He 于2020年9月4日周五 下午6:02写道: > >> > >>> hi, all > >>> 我这边用flink sql client 创建表的时候 > >>> > >>> CREATE TABLE MyUserTable ( > >>> column_name1 INT, > >>> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > >>> 'connector' = 'filesystem', -- required: specify the > connector > >>> 'path' = 'file:///path/to/whatever', -- required: path to a > directory > >>> 'format' = 'json', -- required: file system > connector) > >>> > >>> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > >>> sql client 提交job会很慢,最后会报错 > >>> > >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: > >>> [Internal server error., >>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job > has > >>> already been submitted. at > >>> > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > >>> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > >>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>> at java.lang.reflect.Method.invoke(Method.java:498) at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > >>> at > >>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > >>> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > >>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > >>> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > >>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > >>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > >>> akka.actor.ActorCell.invoke(ActorCell.scala:561) at > >>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > >>> akka.dispatch.Mailbox.run(Mailbox.scala:225) at > >>> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > >>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > >>> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>> at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > >>> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>> End of exception on server side>] at > >>> > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > >>> at > >>> > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > >>> at > >>> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > >>> at > >>> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >>> > >>> > >>> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > >>> > >>> 这种情况不知道有没有遇到过? > >>> > >>> Best Wishes. > >>> > >>> > >>> > >> > > -- > Best, Jingsong Lee >
Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow
Hi, all 经过这几天的测试发现,当hdfs目录下的文件比较多的时候就是出现上述情况,比如我这边文件个数接近2k个。 简单的测试当文件个数为1到2个的时候会很快提交job,并且flink session web 页面也没有感觉到卡着。 请问有什么好的解决方式没呢? Best Wishes. Peihui He 于2020年9月4日周五 下午6:25写道: > Hi, all > > 当指定partition的时候这个问题通过path 也没法解决了 > > CREATE TABLE MyUserTable ( > column_name1 INT, > column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( > 'connector' = 'filesystem', -- required: specify the connector > 'path' = 'file:///path/to/whatever', -- required: path to a directory > 'format' = 'json', -- required: file system connector) > > > select * from MyUserTable limit 10; > > job 会一直卡在一个地方 > [image: image.png] > > 这种改怎么解决呢? > > Peihui He 于2020年9月4日周五 下午6:02写道: > >> hi, all >> 我这边用flink sql client 创建表的时候 >> >> CREATE TABLE MyUserTable ( >> column_name1 INT, >> column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( >> 'connector' = 'filesystem', -- required: specify the connector >> 'path' = 'file:///path/to/whatever', -- required: path to a directory >> 'format' = 'json', -- required: file system connector) >> >> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' >> sql client 提交job会很慢,最后会报错 >> >> Caused by: org.apache.flink.runtime.rest.util.RestClientException: >> [Internal server error., > org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has >> already been submitted. at >> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) >> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at >> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at >> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at >> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at >> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at >> akka.actor.ActorCell.invoke(ActorCell.scala:561) at >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at >> akka.dispatch.Mailbox.run(Mailbox.scala:225) at >> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> End of exception on server side>] at >> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) >> at >> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> >> >> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 >> >> 这种情况不知道有没有遇到过? >> >> Best Wishes. >> >> >> >
Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow
Hi, all 当指定partition的时候这个问题通过path 也没法解决了 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'format' = 'json', -- required: file system connector) select * from MyUserTable limit 10; job 会一直卡在一个地方 [image: image.png] 这种改怎么解决呢? Peihui He 于2020年9月4日周五 下午6:02写道: > hi, all > 我这边用flink sql client 创建表的时候 > > CREATE TABLE MyUserTable ( > column_name1 INT, > column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > 'connector' = 'filesystem', -- required: specify the connector > 'path' = 'file:///path/to/whatever', -- required: path to a directory > 'format' = 'json', -- required: file system connector) > > 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > sql client 提交job会很慢,最后会报错 > > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > akka.actor.ActorCell.invoke(ActorCell.scala:561) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > akka.dispatch.Mailbox.run(Mailbox.scala:225) at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > End of exception on server side>] at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > > flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > > 这种情况不知道有没有遇到过? > > Best Wishes. > > >
flink sql 1.11.1 FileSystem SQL Connector path directory slow
hi, all 我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'format' = 'json', -- required: file system connector) 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' sql client 提交job会很慢,最后会报错 Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 这种情况不知道有没有遇到过? Best Wishes.
Re: flink 1.10.1 sql limit 不生效
[image: image.png] order by TUMBLE_START 结果如上图 Peihui He 于2020年8月12日周三 下午3:40写道: > Hi BenChao > > SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。 > order by TUMBLE_START desc 好像不是预期的结果 > > 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么? > > Best Wishes. > > Benchao Li 于2020年8月12日周三 下午3:12写道: > >> 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢? >> >> Peihui He 于2020年8月12日周三 下午3:03写道: >> >> > Hi all, >> > >> > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 >> > sql 类似下面: >> > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, >> e) ON >> > TRUE order by t desc limit 10 >> > >> > 如果select 结果中不包括c的化,就正常了 >> > >> > 请问这个是什么问题呢?sql是写的不对么? >> > >> > Best Wishes. >> > >> >> >> -- >> >> Best, >> Benchao Li >> >
Re: flink 1.10.1 sql limit 不生效
Hi BenChao 发现问题了,是因为select 的字段中包含了array,导致数据显示的比实际limit数据要多 Best Wishes. Benchao Li 于2020年8月12日周三 下午3:12写道: > 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢? > > Peihui He 于2020年8月12日周三 下午3:03写道: > > > Hi all, > > > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > > sql 类似下面: > > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) > ON > > TRUE order by t desc limit 10 > > > > 如果select 结果中不包括c的化,就正常了 > > > > 请问这个是什么问题呢?sql是写的不对么? > > > > Best Wishes. > > > > > -- > > Best, > Benchao Li >
Re: flink 1.10.1 sql limit 不生效
应该是我这边sql问题,我这边在看看,打扰大家了 Peihui He 于2020年8月12日周三 下午3:03写道: > Hi all, > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > sql 类似下面: > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) > ON TRUE order by t desc limit 10 > > 如果select 结果中不包括c的化,就正常了 > > 请问这个是什么问题呢?sql是写的不对么? > > Best Wishes. >
flink 1.10.1 sql limit 不生效
Hi all, 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 sql 类似下面: select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON TRUE order by t desc limit 10 如果select 结果中不包括c的化,就正常了 请问这个是什么问题呢?sql是写的不对么? Best Wishes.
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Congxian, 这个问题有结论没呢? Best wishes. Peihui He 于2020年7月17日周五 下午4:21写道: > Hi Congxian, > > [image: Snipaste_2020-07-17_16-20-06.png] > > 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。 > > Best wishes. > > Congxian Qiu 于2020年7月17日周五 下午1:31写道: > >> Hi Peihui >> >> 感谢你的回复,我这边没有看到附件,你那边能否确认下呢? >> >> Best, >> Congxian >> >> >> Peihui He 于2020年7月17日周五 上午10:13写道: >> >> > Hi Congxian >> > >> > 见附件。 >> > >> > Best wishes. >> > >> > Congxian Qiu 于2020年7月16日周四 下午8:24写道: >> > >> >> Hi Peihui >> >> >> >> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug >> >> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可, >> >> 非常感谢~ >> >> >> >> [1] https://gist.github.com/ >> >> >> >> Best, >> >> Congxian >> >> >> >> >> >> Peihui He 于2020年7月16日周四 下午5:54写道: >> >> >> >> > Hi Yun, >> >> > >> >> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。 >> >> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。 >> >> > >> >> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。 >> >> > >> >> > Peihui He 于2020年7月16日周四 下午5:26写道: >> >> > >> >> >> Hi Yun, >> >> >> >> >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。 >> >> >> >> >> >> Best wishes. >> >> >> >> >> >> Yun Tang 于2020年7月16日周四 下午5:04写道: >> >> >> >> >> >>> Hi Peihui >> >> >>> >> >> >>> Flink-1.10.1 >> >> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。 >> >> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧? >> >> >>> >> >> >>> >> >> >>> [1] >> >> >>> >> >> >> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a >> >> >>> 祝好 >> >> >>> 唐云 >> >> >>> >> >> >>> From: Peihui He >> >> >>> Sent: Thursday, July 16, 2020 16:15 >> >> >>> To: user-zh@flink.apache.org >> >> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 >> >> >>> >> >> >>> Hi Yun, >> >> >>> >> >> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket >> >> >>> 输入的特定的word抛出runtimeexception 使task >> >> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 >> >> >>> >> >> >>> Caused by: java.nio.file.NoSuchFileException: >> >> >>> >> >> >>> >> >> >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst >> >> >>> -> >> >> >>> >> >> >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst >> >> >>> >> >> >>> 情况和@chenxyz 类似。 >> >> >>> >> >> >>> >> >> >> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html >> >> >>> >> >> >>> 换成1.10.1 就可以了 >> >> >>> >> >> >>> Best wishes. >> >> >>> >> >> >>> Yun Tang 于2020年7月15日周三 下午4:35写道: >> >> >>> >> >> >>> > Hi Robin >> >> >>> > >> >> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 >> >> >>> > >> >> >>> >> >> >> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state >> >> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. >> >> >>> > >> >> >>> > 另外 @Pe
flink sink kafka Error while confirming checkpoint
Hello, flink 1.10.1 kafka 2.12-1.1.0 运行一段时间后会出现一下错误,不知道有遇到过没? java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:935) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:907) at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:302) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointComplete$8(StreamTask.java:919) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:101) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:913) ... 12 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. Best wishes.
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Congxian, [image: Snipaste_2020-07-17_16-20-06.png] 我这边通过chrome 浏览器看到是上传了的,并且可以下载的。 Best wishes. Congxian Qiu 于2020年7月17日周五 下午1:31写道: > Hi Peihui > > 感谢你的回复,我这边没有看到附件,你那边能否确认下呢? > > Best, > Congxian > > > Peihui He 于2020年7月17日周五 上午10:13写道: > > > Hi Congxian > > > > 见附件。 > > > > Best wishes. > > > > Congxian Qiu 于2020年7月16日周四 下午8:24写道: > > > >> Hi Peihui > >> > >> 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug > >> 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可, > >> 非常感谢~ > >> > >> [1] https://gist.github.com/ > >> > >> Best, > >> Congxian > >> > >> > >> Peihui He 于2020年7月16日周四 下午5:54写道: > >> > >> > Hi Yun, > >> > > >> > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。 > >> > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。 > >> > > >> > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。 > >> > > >> > Peihui He 于2020年7月16日周四 下午5:26写道: > >> > > >> >> Hi Yun, > >> >> > >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。 > >> >> > >> >> Best wishes. > >> >> > >> >> Yun Tang 于2020年7月16日周四 下午5:04写道: > >> >> > >> >>> Hi Peihui > >> >>> > >> >>> Flink-1.10.1 > >> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。 > >> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧? > >> >>> > >> >>> > >> >>> [1] > >> >>> > >> > https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a > >> >>> 祝好 > >> >>> 唐云 > >> >>> > >> >>> From: Peihui He > >> >>> Sent: Thursday, July 16, 2020 16:15 > >> >>> To: user-zh@flink.apache.org > >> >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > >> >>> > >> >>> Hi Yun, > >> >>> > >> >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket > >> >>> 输入的特定的word抛出runtimeexception 使task > >> >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 > >> >>> > >> >>> Caused by: java.nio.file.NoSuchFileException: > >> >>> > >> >>> > >> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > >> >>> -> > >> >>> > >> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > >> >>> > >> >>> 情况和@chenxyz 类似。 > >> >>> > >> >>> > >> > http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html > >> >>> > >> >>> 换成1.10.1 就可以了 > >> >>> > >> >>> Best wishes. > >> >>> > >> >>> Yun Tang 于2020年7月15日周三 下午4:35写道: > >> >>> > >> >>> > Hi Robin > >> >>> > > >> >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 > >> >>> > > >> >>> > >> [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state > >> >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. > >> >>> > > >> >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root > >> >>> cause,还请在日志中找一下无法恢复的root > >> >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 > >> >>> > > >> >>> > > >> >>> > [1] > >> >>> > > >> >>> > >> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table > >> >>> > [2] > >> >>> > > >> >>> > >> > https://ci
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Congxian 见附件。 Best wishes. Congxian Qiu 于2020年7月16日周四 下午8:24写道: > Hi Peihui > > 感谢你的回信。能否帮忙用 1.10.0 复现一次,然后把相关的日志(JM log 和 TM Log,方便的话,也开启一下 debug > 日志)分享一下呢?如果日志太大的话,可以尝试贴待 gist[1] 然后邮件列表回复一个地址即可, > 非常感谢~ > > [1] https://gist.github.com/ > > Best, > Congxian > > > Peihui He 于2020年7月16日周四 下午5:54写道: > > > Hi Yun, > > > > 我这边测试需要在集群上跑的,本地idea跑是没有问题的。 > > flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。 > > > > 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。 > > > > Peihui He 于2020年7月16日周四 下午5:26写道: > > > >> Hi Yun, > >> > >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。 > >> > >> Best wishes. > >> > >> Yun Tang 于2020年7月16日周四 下午5:04写道: > >> > >>> Hi Peihui > >>> > >>> Flink-1.10.1 > >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。 > >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧? > >>> > >>> > >>> [1] > >>> > https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a > >>> 祝好 > >>> 唐云 > >>> > >>> From: Peihui He > >>> Sent: Thursday, July 16, 2020 16:15 > >>> To: user-zh@flink.apache.org > >>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > >>> > >>> Hi Yun, > >>> > >>> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket > >>> 输入的特定的word抛出runtimeexception 使task > >>> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 > >>> > >>> Caused by: java.nio.file.NoSuchFileException: > >>> > >>> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > >>> -> > >>> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > >>> > >>> 情况和@chenxyz 类似。 > >>> > >>> > http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html > >>> > >>> 换成1.10.1 就可以了 > >>> > >>> Best wishes. > >>> > >>> Yun Tang 于2020年7月15日周三 下午4:35写道: > >>> > >>> > Hi Robin > >>> > > >>> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 > >>> > > >>> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state > >>> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. > >>> > > >>> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root > >>> cause,还请在日志中找一下无法恢复的root > >>> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 > >>> > > >>> > > >>> > [1] > >>> > > >>> > https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table > >>> > [2] > >>> > > >>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html > >>> > > >>> > 祝好 > >>> > 唐云 > >>> > > >>> > > >>> > > >>> > From: Robin Zhang > >>> > Sent: Wednesday, July 15, 2020 16:23 > >>> > To: user-zh@flink.apache.org > >>> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > >>> > > >>> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑 > >>> > > >>> > Best > >>> > Robin Zhang > >>> > > >>> > From: Peihui He <[hidden email]> > >>> > Sent: Tuesday, July 14, 2020 10:42 > >>> > To: [hidden email] <[hidden email]> > >>> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > >>> > > >>> > hello, > >>> > > >>> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > >>> > > >>> > > >>> > Caus
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Yun, 我这边测试需要在集群上跑的,本地idea跑是没有问题的。 flink 1.10.1 的flink-conf.yaml 是cope flink 1.10.0 的,但是1.10.0 就是报错。 附件就是源码job。如果你要的跑需要改下socket host的。只要socket 中输入hepeihui 就会抛异常的。 Peihui He 于2020年7月16日周四 下午5:26写道: > Hi Yun, > > 作业没有开启local recovery, 我这边测试1.10.0是必现的。 > > Best wishes. > > Yun Tang 于2020年7月16日周四 下午5:04写道: > >> Hi Peihui >> >> Flink-1.10.1 >> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。 >> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧? >> >> >> [1] >> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a >> 祝好 >> 唐云 >> >> From: Peihui He >> Sent: Thursday, July 16, 2020 16:15 >> To: user-zh@flink.apache.org >> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 >> >> Hi Yun, >> >> 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket >> 输入的特定的word抛出runtimeexception 使task >> 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 >> >> Caused by: java.nio.file.NoSuchFileException: >> >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst >> -> >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst >> >> 情况和@chenxyz 类似。 >> >> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html >> >> 换成1.10.1 就可以了 >> >> Best wishes. >> >> Yun Tang 于2020年7月15日周三 下午4:35写道: >> >> > Hi Robin >> > >> > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 >> > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state >> > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. >> > >> > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root >> > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 >> > >> > >> > [1] >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table >> > [2] >> > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html >> > >> > 祝好 >> > 唐云 >> > >> > >> > >> > From: Robin Zhang >> > Sent: Wednesday, July 15, 2020 16:23 >> > To: user-zh@flink.apache.org >> > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 >> > >> > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑 >> > >> > Best >> > Robin Zhang >> > >> > From: Peihui He <[hidden email]> >> > Sent: Tuesday, July 14, 2020 10:42 >> > To: [hidden email] <[hidden email]> >> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 >> > >> > hello, >> > >> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 >> > >> > >> > Caused by: java.nio.file.NoSuchFileException: >> > >> > >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst >> > -> >> > >> > >> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst >> > >> > 配置和1.9.2 一样: >> > state.backend: rocksdb >> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ >> > state.savepoints.dir: hdfs:///flink/savepoints/wc/ >> > state.backend.incremental: true >> > >> > 代码上都有 >> > >> > env.enableCheckpointing(1); >> > >> > >> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, >> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); >> > >> > >> > 是1.10.0 需要做什么特别配置么? >> > >> > >> > >> > -- >> > Sent from: http://apache-flink.147419.n8.nabble.com/ >> > >> >
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Yun, 作业没有开启local recovery, 我这边测试1.10.0是必现的。 Best wishes. Yun Tang 于2020年7月16日周四 下午5:04写道: > Hi Peihui > > Flink-1.10.1 > 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。 > 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧? > > > [1] > https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a > 祝好 > 唐云 > ________ > From: Peihui He > Sent: Thursday, July 16, 2020 16:15 > To: user-zh@flink.apache.org > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > Hi Yun, > > 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket > 输入的特定的word抛出runtimeexception 使task > 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 > > Caused by: java.nio.file.NoSuchFileException: > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > -> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > 情况和@chenxyz 类似。 > > http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html > > 换成1.10.1 就可以了 > > Best wishes. > > Yun Tang 于2020年7月15日周三 下午4:35写道: > > > Hi Robin > > > > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 > > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state > > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. > > > > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root > > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table > > [2] > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html > > > > 祝好 > > 唐云 > > > > > > > > From: Robin Zhang > > Sent: Wednesday, July 15, 2020 16:23 > > To: user-zh@flink.apache.org > > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > > > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑 > > > > Best > > Robin Zhang > > > > From: Peihui He <[hidden email]> > > Sent: Tuesday, July 14, 2020 10:42 > > To: [hidden email] <[hidden email]> > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > > > hello, > > > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > > > > > > Caused by: java.nio.file.NoSuchFileException: > > > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > > -> > > > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > > > 配置和1.9.2 一样: > > state.backend: rocksdb > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > > state.savepoints.dir: hdfs:///flink/savepoints/wc/ > > state.backend.incremental: true > > > > 代码上都有 > > > > env.enableCheckpointing(1); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > > > > > > 是1.10.0 需要做什么特别配置么? > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > >
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Yun, 不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket 输入的特定的word抛出runtimeexception 使task 失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报 Caused by: java.nio.file.NoSuchFileException: /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst -> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst 情况和@chenxyz 类似。 http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html 换成1.10.1 就可以了 Best wishes. Yun Tang 于2020年7月15日周三 下午4:35写道: > Hi Robin > > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state > schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. > > 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root > cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html > > 祝好 > 唐云 > > > > From: Robin Zhang > Sent: Wednesday, July 15, 2020 16:23 > To: user-zh@flink.apache.org > Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > 据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑 > > Best > Robin Zhang > > From: Peihui He <[hidden email]> > Sent: Tuesday, July 14, 2020 10:42 > To: [hidden email] <[hidden email]> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > hello, > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > > > Caused by: java.nio.file.NoSuchFileException: > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > -> > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > 配置和1.9.2 一样: > state.backend: rocksdb > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > state.savepoints.dir: hdfs:///flink/savepoints/wc/ > state.backend.incremental: true > > 代码上都有 > > env.enableCheckpointing(1); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > > > 是1.10.0 需要做什么特别配置么? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi chenxyz, 我们遇到的问题应该是一样的,换了1.10.1 后就可以从checkpoint 中恢复了。珞 Best wishes. chenxyz 于2020年7月15日周三 下午9:53写道: > > > > Hello, > Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。 > > http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html#a2239 > 解决方式: > 1. 使用hdfs作为状态后端不会报错 > 2. 升级至1.10.1使用rocksdb也不会出现该问题 > > > > > > > > > > > > > > > 在 2020-07-14 14:41:53,"Peihui He" 写道: > >Hi Yun, > > > >我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce -> > >print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2 > >里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on > >yarn。 > > > >Best wishes. > > > >Yun Tang 于2020年7月14日周二 上午11:57写道: > > > >> Hi Peihui > >> > >> > 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root > >> cause。 > >> > >> [1] > >> > https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > >> > >> > >> 祝好 > >> 唐云 > >> > >> From: Peihui He > >> Sent: Tuesday, July 14, 2020 10:42 > >> To: user-zh@flink.apache.org > >> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > >> > >> hello, > >> > >> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > >> > >> > >> Caused by: java.nio.file.NoSuchFileException: > >> > >> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > >> -> > >> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > >> > >> 配置和1.9.2 一样: > >> state.backend: rocksdb > >> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > >> state.savepoints.dir: hdfs:///flink/savepoints/wc/ > >> state.backend.incremental: true > >> > >> 代码上都有 > >> > >> env.enableCheckpointing(1); > >> > >> > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > >> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > >> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > >> > >> > >> 是1.10.0 需要做什么特别配置么? > >> >
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Congxian, 不好意思,本来想准备下例子再回下邮件的,一直拖了这么久。 情况是你说的第2种。 同@chenxyz遇到的情况类似,日志可以参考chenxyz发的 http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html 按照chenxyz 的建议换了1.10.1版本后就没有问题了。 Best wishes. Congxian Qiu 于2020年7月15日周三 下午1:04写道: > Hi > > 我尝试理解一下: > 1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的 > checkpoint 恢复,发现恢复不了? > 2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复? > > 你这里的问题是第 1 种还是第 2 种呢? > > 另外能否分享一下你的操作步骤以及出错时候的 taskmanager log 呢? > > Best, > Congxian > > > Peihui He 于2020年7月14日周二 下午2:46写道: > > > Hi Congxian, > > > > 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word > > 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10 > > 都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢? > > > > Best wishes. > > > > Congxian Qiu 于2020年7月14日周二 下午1:54写道: > > > > > Hi > > > > > > 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢? > > > 另外你可以看下 tm log 看看有没有其他异常 > > > > > > Best, > > > Congxian > > > > > > > > > Yun Tang 于2020年7月14日周二 上午11:57写道: > > > > > > > Hi Peihui > > > > > > > > > > > > > > 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root > > > > cause。 > > > > > > > > [1] > > > > > > > > > > https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > > > > > > > > > > > > 祝好 > > > > 唐云 > > > > > > > > From: Peihui He > > > > Sent: Tuesday, July 14, 2020 10:42 > > > > To: user-zh@flink.apache.org > > > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > > > > > > > hello, > > > > > > > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > > > > > > > > > > > > Caused by: java.nio.file.NoSuchFileException: > > > > > > > > > > > > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > > > > -> > > > > > > > > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > > > > > > > 配置和1.9.2 一样: > > > > state.backend: rocksdb > > > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > > > > state.savepoints.dir: hdfs:///flink/savepoints/wc/ > > > > state.backend.incremental: true > > > > > > > > 代码上都有 > > > > > > > > env.enableCheckpointing(1); > > > > > > > > > > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > > > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > > > > > > > > > > > > 是1.10.0 需要做什么特别配置么? > > > > > > > > > >
Re: flink sql 1.10 insert into tb select 复杂schema 失败
Hi BenChao, 换成1.10.1 就可以了。刚才那封邮件不行,是因为依赖flink-kafka的依赖版本没有修改过来。 Thank you. Benchao Li 于2020年7月15日周三 上午10:25写道: > Hi Peihui, > > 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。 > > [1] https://issues.apache.org/jira/browse/FLINK-16220 > > Peihui He 于2020年7月15日周三 上午9:54写道: > > > Hello, > > > > 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如 > > create table xxx ( > > a string, > > b row( > > c row(d string) > > ) > > ) > > > > 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误 > > > > Caused by: java.lang.ClassCastException: > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode > > cannot be cast to > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138) > > ... 38 more > > > > > > Best wishes. > > > > > -- > > Best, > Benchao Li >
Re: flink sql 1.10 insert into tb select 复杂schema 失败
Hi BenChao, 刚才尝试了flink 1.10.1 但是问题还是存在,看了 [1] https://issues.apache.org/jira/browse/FLINK-16628 这个bug fix没有我给的 table 复杂, CREATE TABLE source_kafka_sasl ( svt STRING, ops ROW ) WITH () 我的是在原有的ops 里面又前嵌套了row。 Benchao Li 于2020年7月15日周三 上午10:25写道: > Hi Peihui, > > 这是一个已知bug[1],已经在1.10.1和1.11.0中修复了,可以尝试下这两个版本。 > > [1] https://issues.apache.org/jira/browse/FLINK-16220 > > Peihui He 于2020年7月15日周三 上午9:54写道: > > > Hello, > > > > 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如 > > create table xxx ( > > a string, > > b row( > > c row(d string) > > ) > > ) > > > > 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误 > > > > Caused by: java.lang.ClassCastException: > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode > > cannot be cast to > > > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) > > at > > > > > org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138) > > ... 38 more > > > > > > Best wishes. > > > > > -- > > Best, > Benchao Li >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hi BenChao, 请问第2个解决思路中 额外加一个选项是指什么呢? Best wishes. Benchao Li 于2020年7月10日周五 下午1:54写道: > Hi Peihui, > > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 > > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 > 从结果上来看,*还不能完全做到原封不动的输出到下游*。 > > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 > 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 > 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 > > Jark Wu 于2020年7月10日周五 下午12:22写道: > > > 社区有个 issue 正在解决这个问题,可以关注一下 > > https://issues.apache.org/jira/browse/FLINK-18002 > > > > Best, > > Jark > > > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > > > Hi, Peihui > > > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > > 的解析的底层实现 > > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > > 然后query里用UDTF处理。 > > > > > > > > > 祝好 > > > Leonard Xu > > > > > > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > > > Hello, > > > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > > > Best wishes. > > > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > > > >> Hello, > > > >> > > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > > >> > > > >> > > > >> Best wishes. > > > >> > > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > > >> > > > >>> Hi Peihui, > > > >>> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > > >>> > > > >>> { > > > >>>"a":"b", > > > >>>"c":{ > > > >>>"d":"e", > > > >>>"g":"f" > > > >>>} > > > >>> }, > > > >>> > > > >>> 那么在 kafka table source 可以使用 row 来定义: > > > >>> > > > >>> create table xxx ( > > > >>> a varchar, > > > >>> c row > > > >>> ) > > > >>> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > > >>> > > > >>> Best, > > > >>> LakeShen > > > >>> > > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > > >>> > > > >>>> Hello: > > > >>>> > > > >>>>在用flink > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > >>>> > > > >>>> 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > >>>> > > > >>>> > > > >>>> Best wishes. > > > >>>> > > > >>> > > > >> > > > > > > > > > > > -- > > Best, > Benchao Li >
flink sql 1.10 insert into tb select 复杂schema 失败
Hello, 在使用flink sql 1.10.0 时候,当source table 中含有复杂schema,比如 create table xxx ( a string, b row( c row(d string) ) ) 当c 中有值的时候,sql 如下 insert into select * from xxx会出现下面错误 Caused by: java.lang.ClassCastException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.NullNode cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:337) at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$assembleRowConverter$dd344700$1(JsonRowSerializationSchema.java:345) at org.apache.flink.formats.json.JsonRowSerializationSchema.lambda$wrapIntoNullableConverter$1fa09b5b$1(JsonRowSerializationSchema.java:189) at org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:138) ... 38 more Best wishes.
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Congxian, 这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word 抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10 都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢? Best wishes. Congxian Qiu 于2020年7月14日周二 下午1:54写道: > Hi > > 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢? > 另外你可以看下 tm log 看看有没有其他异常 > > Best, > Congxian > > > Yun Tang 于2020年7月14日周二 上午11:57写道: > > > Hi Peihui > > > > > 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root > > cause。 > > > > [1] > > > https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > > > > > > 祝好 > > 唐云 > > > > From: Peihui He > > Sent: Tuesday, July 14, 2020 10:42 > > To: user-zh@flink.apache.org > > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > > > hello, > > > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > > > > > > Caused by: java.nio.file.NoSuchFileException: > > > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > > -> > > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > > > 配置和1.9.2 一样: > > state.backend: rocksdb > > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > > state.savepoints.dir: hdfs:///flink/savepoints/wc/ > > state.backend.incremental: true > > > > 代码上都有 > > > > env.enableCheckpointing(1); > > > > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > > > > > > 是1.10.0 需要做什么特别配置么? > > >
Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Yun, 我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce -> print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2 里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on yarn。 Best wishes. Yun Tang 于2020年7月14日周二 上午11:57写道: > Hi Peihui > > 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root > cause。 > > [1] > https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473 > > > 祝好 > 唐云 > ____ > From: Peihui He > Sent: Tuesday, July 14, 2020 10:42 > To: user-zh@flink.apache.org > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复 > > hello, > > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 > > > Caused by: java.nio.file.NoSuchFileException: > > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst > -> > /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst > > 配置和1.9.2 一样: > state.backend: rocksdb > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ > state.savepoints.dir: hdfs:///flink/savepoints/wc/ > state.backend.incremental: true > > 代码上都有 > > env.enableCheckpointing(1); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); > > > 是1.10.0 需要做什么特别配置么? >
flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
hello, 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示 Caused by: java.nio.file.NoSuchFileException: /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst -> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst 配置和1.9.2 一样: state.backend: rocksdb state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ state.savepoints.dir: hdfs:///flink/savepoints/wc/ state.backend.incremental: true 代码上都有 env.enableCheckpointing(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS))); 是1.10.0 需要做什么特别配置么?
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
感谢,已经按后面一种方式做了珞 Leonard Xu 于2020年7月10日周五 上午11:13写道: > Hi, Peihui > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现 > 就是按照json的标准格式解析(jackson)的,没法将一个 > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > 一种做法是定义复杂的jsonObject对应的ROW > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > 然后query里用UDTF处理。 > > > 祝好 > Leonard Xu > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > Hello, > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > Best wishes. > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > >> Hello, > >> > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > >> > >> > >> Best wishes. > >> > >> LakeShen 于2020年7月10日周五 上午10:03写道: > >> > >>> Hi Peihui, > >>> > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > >>> > >>> { > >>>"a":"b", > >>>"c":{ > >>>"d":"e", > >>>"g":"f" > >>>} > >>> }, > >>> > >>> 那么在 kafka table source 可以使用 row 来定义: > >>> > >>> create table xxx ( > >>> a varchar, > >>> c row > >>> ) > >>> > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > >>> > >>> Best, > >>> LakeShen > >>> > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > >>> > >>>> Hello: > >>>> > >>>>在用flink > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > >>>> > >>>> 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > >>>> > >>>> > >>>> Best wishes. > >>>> > >>> > >> > >
flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
Hello: 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 Best wishes.
Re: flink 1.10 kafka collector topic 配置pattern
好的,感谢珞 Leonard Xu 于2020年7月3日周五 下午4:07写道: > Hello > > 我了解到社区有人在做了,1.12 应该会支持 > > 祝好 > Leonard Xu > > > 在 2020年7月3日,16:02,Peihui He 写道: > > > > hello > > > > 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? > > > > best wishes > >
flink 1.10 kafka collector topic 配置pattern
hello 请教大家,flink 1.10里面kafka connector 不能配置topic pattern,后续会支持吗? best wishes
Re: flink 如何自定义connector
hello 正在尝试中,感谢解答珞 best wishes 111 于2020年5月28日周四 上午10:16写道: > Hi, > 想要在sqlgateway里面使用,那么可以看看下面几个条件: > 1 满足SPI的要求,能让flink自动发现实现类 > 2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下 > 3 如果与Hive集成,使用hivecatalog,那么先要注册表 > 这样就可以使用了。 > Best, > Xinghalo
Re: flink 如何自定义connector
hello 现在已经定义了一个tablesource,可以通过 batchEnv.registerTableSource 这个注册 并查询数据,但是如何在sqlgateway 中配置呢? Leonard Xu 于2020年5月28日周四 上午9:32写道: > Hi, > 可以参考现有的connector,如hbase,jdbc,结合[1]实现自定义connector。 > > > 祝好, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html > < > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html > > > > > > 在 2020年5月28日,09:16,Peihui He 写道: > > > > hello > > > >请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql > > gateway,使得可以执行sql的操作呢? > > > > > > best wish > >
flink 如何自定义connector
hello 请教大家,flink 1.10中如何自定义coonnecter,然后注册到flink sql gateway,使得可以执行sql的操作呢? best wish
Re: flink cep 匹配一段时间类A,B,C事件发生
是的,这个想法好,谢谢 Dian Fu 于2020年4月16日周四 上午9:59写道: > 类似于这样? > > AA follow by BB follow by CC > > AA定义成A or B or C > BB定义成(A or B or C)and BB.type != AA.type > CC定义成(A or B or C)and CC.type != AA.type and CC.type != BB.type > > > 在 2020年4月16日,上午8:40,Peihui He 写道: > > > > hello,all > > > >我这个边需要匹配一段时间内A,B,C事件同时发生,但是不要求A,B,C事件的顺序,flink cep有什么好的方式不? > > > > 有个方案是 定义多个模式组,每个模式组是A,B,C事件的一次排列组合,但是这样比较麻烦,如果事件个数多的话,需要写太多组合。 > > > > > > best wish > >
flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000
大家好,我在用flink 1.9.2 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了 这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。 不知道这是什么原因呢?
flink influxdb 有些表中不包含jobname
hello, 我这边用的是influxdb 作为flink 1.9.2的reporter, 但是在一些表里面没有jobname信息。这样会是的在每次重启的时候都得修改grafana的图标信息,很麻烦。 请问有什么好的儿方式解决没呢?