新kafka source中如何做到忽略kafka offset呢

2022-06-26 文章 yidan zhao
如题,我期望自定义加个开关,在指定开关情况重启作业时,能够强制kafka采用latest,而不是基于状态重启。  注意其他算子还需要基于状态重启。


之前我是覆盖了FlinkKafkaConsumerBase中的initializeState方法,根据我加的bool类型配置,决定是否restore
kafka souce部分的状态。


目前按照新的kafkaSouce我分析下来,大概有几个思路,不清楚是否ok。
(1)从SourceOperator角度调整,这需要覆盖运行时类,暂不考虑。
(2)覆盖 
SourceReaderBase的addSplits方法,但是这个地方目前的实现貌似无法区分是新加的partition还是restore的partition。
虽然也不要紧,变更topic和新增partition的情况也很少。 我可以在这个地方根据配置决定是否强制使用latest。
(3)新kafkaSource在基于state重启时,KafkaSourceEnumerator恢复的状态仅仅是assignedPartition,本质和offset没任何关系。只有发现新的partition才可能。
 
从KafkaSourceEnumerator角度考虑,可以让KafkaSourceEnumerator忽略状态,然后新发现的topic就都是new,经过initOffset可自定义。
 但是,这种情况下,需要让sourceReader部分忽略restore的kafkaSplit
(SourceOperator.open方法中的调用),只接受新分配过来的kafkaSplit(SourceOperator.handleAddSplitsEvent方法中的调用),还是需要调整
sourceOperator。


如上,貌似目前没有特别好改动方法,期望能简单改动,因为每次版本升级我都需要实现一次。


Flink状态过期时是否可以将其输出到日志中

2022-06-26 文章 haishui
Hi,
Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。

Best Regards!

Re: flink1.15中kafka source、sink对kafka-client的版本要求是否可降低

2022-06-26 文章 Qingsheng Ren
Hi,

目前 Kafka connector 会依赖于高版本 kafka-clients 的一些 API,而且 sink 端为支持 exactly-once 
语义使用了反射。Flink 社区考虑到 Kafka client 本身保证了较好后向兼容性,因此不再提供使用旧版本 client 的 Kafka 
connector,针对 5 年前发布的 Kafka 0.11 版本进行适配也不太现实。

祝好,
庆盛

> On Jun 23, 2022, at 19:37, yidan zhao  wrote:
> 
> 如题,我想问下相关了解的同学,目前只是升级 kafka-client 新版本,换了下接口用法。还是依赖到部分新版本client才有的功能呢?
> 是否有可能基于低版本 kafka-client 实现呢?
> 
> 可以的话我可能自己覆盖实现下。
> 因为高版本kafka-client不支持公司的kafka,公司kafka是开源kafka外层加了一层proxy。使用太高版本kafka
> client访问会有问题(推荐的是0.11,我测试最多到2.2的client)。



Re: Flink消费kafka实时同步到MongoDB出现丢数据

2022-06-26 文章 Qingsheng Ren
Hi,

Flink Kafka connector 会在 checkpoint 完成后将位点提交至 Kafka broker,但是 Flink 并不会依赖于提交到 
Kafka broker 上的位点做故障恢复,而是使用 checkpoint 中存储的位点恢复。

关于丢失数据个人建议可以先从小数据量开始复现问题,然后从 source 至 sink 再排查。

祝好,
庆盛

> On Jun 26, 2022, at 11:54, casel.chen  wrote:
> 
> mysql cdc -> kafka -> mongodb
> 写了一个flink 
> 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git
> 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。
> 
> 
> flink消费kafka处理数据后提交kafka 
> offset的流程是怎样的?一开始消费kafka获取到pendingOffsets,如何确保这些pendingOffsets都处理完成然后全部提交呢?有没有这块源码解析资料?
> 



Re: Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 文章 Qingsheng Ren
Hi,

这个是 Apache Kafka consumer 的一个已知问题,参见 FLINK-28060 [1] 和 KAFKA-13840 [2]。

[1] https://issues.apache.org/jira/browse/FLINK-28060
[2] https://issues.apache.org/jira/browse/KAFKA-13840

祝好,
庆盛

> On Jun 27, 2022, at 09:16, RS  wrote:
> 
> Hi,
> 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?
> 
> 
> 现象如下:
> 1. 任务没有异常,
> 2. 数据能正常消费处理,不影响数据使用
> 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
> 4. 部分任务的从Kafka的offset提交失败,部分正常
> 
> 
> WARN日志如下:
> 2022-06-27 01:07:42,725 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11398 (max part counter=1).
> 2022-06-27 01:07:42,830 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11398.
> 2022-06-27 01:07:43,820 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11476 (max part counter=0).
> 2022-06-27 01:07:43,946 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11476.
> 2022-06-27 01:07:45,218 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 checkpointing for checkpoint with id=11521 (max part counter=47).
> 2022-06-27 01:07:45,290 INFO  
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 
> 0 received completion notification for checkpoint with id=11521.
> 2022-06-27 01:07:45,521 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11443
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 2022-06-27 01:07:45,990 WARN  
> org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed 
> to commit consumer offsets for checkpoint 11398
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset 
> commit failed with a retriable exception. You should retry committing the 
> latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.
> 
> 
> Thanks~



Re: Re: lookup join对应task无法正常恢复?

2022-06-26 文章 xuchao
是的,使用的flink sql内置的jdbc connector。

Thanks! 



Best,
Amber Xu
 
发件人: Lincoln Lee
发送时间: 2022-06-24 21:27
收件人: user-zh
主题: Re: lookup join对应task无法正常恢复?
Hi,
   请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存,
并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了
   如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常
 
Best,
Lincoln Lee
 
 
Xuchao  于2022年6月24日周五 17:15写道:
 
> 您好!
> 我在使用flink时遇到一些问题。
> flink-1.14.4
> sqlserver-cdc-2.2.1
> yarn-per-job
>
> 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
> sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
> 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
>
> 以上,可能是什么问题,应该如何解决呢?
>
> 期待回复!
> best wishes!
>
> 附日志:
> 2022-06-24 14:55:45,950 ERROR
> com.ververica.cdc.debezium.internal.Handover [] - Reporting
> error:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the change event producer. This connector will be
> stopped.
> at
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_301]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_301]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_301]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
> cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
> ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> at
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
> ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
> ... 7 more
> 2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,954 INFO
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
> [] - Source: TableSourceScan(table=[[default_catalog, default_database,
> carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
> circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
> (create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id])
> -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
> discarding 0 drained requests
> 2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine
> [] - Stopping the embedded engine
> 2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task
> [] - Source: TableSourceScan(table=[[default_catalog,
> default_database, carflow]], fields=[id, plate_license, site_id,
> create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license,
> site_id, create_time, (create_time + -2880:INTERVAL HOUR) AS c_time,
> flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time],
> watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched
> from RUNNING to FAILED with failure cause:
> com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
> An exception occurred in the cha

Flink-1.15.0 消费kafka提交offset失败?

2022-06-26 文章 RS
Hi,
请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理?


现象如下:
1. 任务没有异常,
2. 数据能正常消费处理,不影响数据使用
3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset
4. 部分任务的从Kafka的offset提交失败,部分正常


WARN日志如下:
2022-06-27 01:07:42,725 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11398 (max part counter=1).
2022-06-27 01:07:42,830 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11398.
2022-06-27 01:07:43,820 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11476 (max part counter=0).
2022-06-27 01:07:43,946 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11476.
2022-06-27 01:07:45,218 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=11521 (max part counter=47).
2022-06-27 01:07:45,290 INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
received completion notification for checkpoint with id=11521.
2022-06-27 01:07:45,521 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11443
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
2022-06-27 01:07:45,990 WARN  
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to 
commit consumer offsets for checkpoint 11398
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit 
failed with a retriable exception. You should retry committing the latest 
consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.


Thanks~

Re:flink 1.14

2022-06-26 文章 RS
Hi,
这边是通过DolphinScheduler来调度的,里面也可以配置job之间的依赖
其他调度系统应该也有类似的功能


Thanks~





在 2022-04-29 16:03:15,"guanyq"  写道:
>咨询下各位大佬
>flink sql在做批处理时,生产环境一般都用什么来做定时调度?
>如果存在job之间的依赖,生产环境是又是采用什么来做通知的?
>
>
>我这面主要是想把hive sql 修改为 flink sql


来自杨柳的邮件

2022-06-26 文章 杨柳
退订

退订

2022-06-26 文章 lian
退订