回复:Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 JasonLee
Hi



我理解应该是任务恢复的时候从上一次成功的 checkpoint 或者你指定的 checkpoint 里记录的 offset 
开始消费,所以此时的统计值应该是有短暂的下跌,因为数据相当于回复到之前重复计算了一部分。这个应该是符合预期的,可能需要在业务上做一些处理。


Best
JasonLee


 回复的原邮件 
| 发件人 | 天下五帝东 |
| 发送日期 | 2022年10月10日 13:34 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink sql从ck恢复,统计数据波动问题 |
Hi:
各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?

flink cdc什么时候支持flink 1.15.x?

2022-10-10 文章 casel.chen
当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink 1.15.2编译会出错,请问flink 
cdc什么时候支持flink 1.15.x?

Re:Re: flink cdc能否同步DDL语句?

2022-10-10 文章 casel.chen
可以给一些hints吗?看哪些类?

















在 2022-10-11 10:22:07,"yuxia"  写道:
>用 datastream api,自己解析一下 DDL。
>
>Best regards,
>Yuxia
>
>- 原始邮件 -
>发件人: "yh z" 
>收件人: "user-zh" 
>发送时间: 星期二, 2022年 10 月 11日 上午 10:23:43
>主题: Re: flink cdc能否同步DDL语句?
>
>目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。
>
>Xuyang  于2022年10月10日周一 16:46写道:
>
>> Hi, 目前应该是不行的
>> 在 2022-09-26 23:27:05,"casel.chen"  写道:
>> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate
>> table等
>>


Re: flink cdc能否同步DDL语句?

2022-10-10 文章 yuxia
用 datastream api,自己解析一下 DDL。

Best regards,
Yuxia

- 原始邮件 -
发件人: "yh z" 
收件人: "user-zh" 
发送时间: 星期二, 2022年 10 月 11日 上午 10:23:43
主题: Re: flink cdc能否同步DDL语句?

目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。

Xuyang  于2022年10月10日周一 16:46写道:

> Hi, 目前应该是不行的
> 在 2022-09-26 23:27:05,"casel.chen"  写道:
> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate
> table等
>


Re: flink cdc能否同步DDL语句?

2022-10-10 文章 yh z
目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。

Xuyang  于2022年10月10日周一 16:46写道:

> Hi, 目前应该是不行的
> 在 2022-09-26 23:27:05,"casel.chen"  写道:
> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate
> table等
>


Re:Re: OutOfMemoryError: Direct buffer memory

2022-10-10 文章 RS
Hi,
调大 taskmanager.memory.task.off-heap.size 应该能解决部分问题,
我这里还有些疑问,部署的session集群,每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?
执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。Flink 
taskmanager的off-heap内存管理有更多的介绍吗?(官网的看过了)
Thanks

在 2022-10-10 12:34:55,"yanfei lei"  写道:
>从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。
>
>Best,
>Yanfei
>
>allanqinjy  于2022年10月8日周六 21:19写道:
>
>>
>> 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。
>>
>>
>> | |
>> allanqinjy
>> |
>> |
>> allanqi...@163.com
>> |
>> 签名由网易邮箱大师定制
>>
>>
>> On 10/8/2022 21:00,RS wrote:
>> Hi,
>>
>>
>> 版本:Flink-1.15.1
>>
>>
>> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink
>> SQL定义执行,source是connector=filesystem,format=raw,path=
>>
>>
>> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?
>>
>>
>> 集群的off-heap都是默认配置,
>> taskmanager.memory.task.off-heap.size=0
>> taskmanager.memory.framework.off-heap.size=128MB
>>
>>
>> 报错堆栈:
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.nio.Bits.reserveMemory(Bits.java:695)
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> at
>> org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
>> at
>> org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
>> at
>> org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>> at

退订

2022-10-10 文章 13341000780
退订





--
发自我的网易邮箱手机智能版

Re:Re: Re: table store 和connector-kafka包冲突吗?

2022-10-10 文章 RS
Hi,


去除掉后运行是没有问题的,所以这种lib的冲突问题,能在官网上加个说明吗,避免后续其他人也遇到这种问题。


Thanks

在 2022-10-10 12:50:33,"yanfei lei"  写道:
>Hi, 从table store的pom中看,table store的dist包shade了一份connector-kafka。
>https://repo1.maven.org/maven2/org/apache/flink/flink-table-store-dist/0.2.0/flink-table-store-dist-0.2.0.pom
>把flink-connector-kafka-1.15.1.jar 去掉再试试?
>
>
>RS  于2022年10月8日周六 17:19写道:
>
>> Hi,
>> 报错如下:
>>
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.table.api.ValidationException: Multiple factories for
>> identifier 'kafka' that implement
>> 'org.apache.flink.table.factories.DynamicTableFactory' found in the
>> classpath.
>>
>>
>> Ambiguous factory classes are:
>>
>>
>> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>> org.apache.flink.table.store.kafka.KafkaLogStoreFactory
>>
>> org.apache.flink.table.store.shaded.streaming.connectors.kafka.table.KafkaDynamicTableFactory
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> 在 2022-10-08 13:38:20,"Shammon FY"  写道:
>> >Hi RS
>> >你这边能提供一下具体的冲突错误栈吗?
>> >
>> >On Sat, Oct 8, 2022 at 8:54 AM RS  wrote:
>> >
>> >> Hi,
>> >>
>> >>
>> >> 版本:flink-1.15.1
>> >> 使用table
>> >>
>> store,需要在lib下放置flink-table-store-dist-0.2.0.jar,之前集群的lib下有一个flink-connector-kafka-1.15.1.jar,使用sql-client,定义kafka源表的时候,发现connector冲突了
>> >>
>> >>
>> 是不是lib下有了flink-table-store-dist-0.2.0.jar,就不能有flink-connector-kafka-1.15.1.jar?
>> >>
>> >>
>> >> Thanks
>>


Fwd: Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 天下五帝东
数据库的统计值

> 下面是被转发的邮件:
> 
> 发件人: Hangxiang Yu 
> 主题: 回复:Flink sql从ck恢复,统计数据波动问题
> 日期: 2022年10月10日 GMT+8 下午2:03:50
> 收件人: user-zh@flink.apache.org
> 回复-收件人: user-zh@flink.apache.org
> 
> 是什么值下跌呢?哪个metric吗?
> 
> On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:
> 
>> Hi:
>>各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
> 
> 
> 
> -- 
> Best,
> Hangxiang.



Re:控制流方式能否改变作业ExecutionGraph?

2022-10-10 文章 Xuyang
Hi,不重启作业的情况下,修改配置,实时改变ExecutionGraph目前是不支持的。







--

Best!
Xuyang





在 2022-09-27 08:36:53,"casel.chen"  写道:
>我有一个数据同步场景是希望通过修改配置来实时动态修改数据同步的目标,例如使用flink 
>cdc将mysql中的变更数据实时同步进kafka,如果后来业务又要求同一份数据再同步进mongodb的话,我是否可以通过修改同步配置来达到不停止原来作业来动态修改数据同步的目标(由一个变多个)?又或者是flink
> cdc整库同步mysql变更数据到kafka一个topic,后来业务又要求按表划分topic,这种能否同样通过修改配置来实现呢?


Re:flink cdc能否同步DDL语句?

2022-10-10 文章 Xuyang
Hi, 目前应该是不行的
在 2022-09-26 23:27:05,"casel.chen"  写道:
>flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等


Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 yidan zhao
ck“打”完是啥意思。

Congxian Qiu  于2022年10月10日周一 15:11写道:
>
> Hi
> 可以的话也同步下相关的计算逻辑,从 checkpoint 恢复后的统计结果可能会和计算逻辑有关
> Best,
> Congxian
>
>
> Hangxiang Yu  于2022年10月10日周一 14:04写道:
>
> > 是什么值下跌呢?哪个metric吗?
> >
> > On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:
> >
> > > Hi:
> > > 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
> >
> >
> >
> > --
> > Best,
> > Hangxiang.
> >


Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 Congxian Qiu
Hi
可以的话也同步下相关的计算逻辑,从 checkpoint 恢复后的统计结果可能会和计算逻辑有关
Best,
Congxian


Hangxiang Yu  于2022年10月10日周一 14:04写道:

> 是什么值下跌呢?哪个metric吗?
>
> On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:
>
> > Hi:
> > 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
>
>
>
> --
> Best,
> Hangxiang.
>


Re: Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 Hangxiang Yu
是什么值下跌呢?哪个metric吗?

On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东  wrote:

> Hi:
> 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?



-- 
Best,
Hangxiang.