新kafka source中如何做到忽略kafka offset呢
如题,我期望自定义加个开关,在指定开关情况重启作业时,能够强制kafka采用latest,而不是基于状态重启。 注意其他算子还需要基于状态重启。 之前我是覆盖了FlinkKafkaConsumerBase中的initializeState方法,根据我加的bool类型配置,决定是否restore kafka souce部分的状态。 目前按照新的kafkaSouce我分析下来,大概有几个思路,不清楚是否ok。 (1)从SourceOperator角度调整,这需要覆盖运行时类,暂不考虑。 (2)覆盖 SourceReaderBase的addSplits方法,但是这个地方目前的实现貌似无法区分是新加的partition还是restore的partition。 虽然也不要紧,变更topic和新增partition的情况也很少。 我可以在这个地方根据配置决定是否强制使用latest。 (3)新kafkaSource在基于state重启时,KafkaSourceEnumerator恢复的状态仅仅是assignedPartition,本质和offset没任何关系。只有发现新的partition才可能。 从KafkaSourceEnumerator角度考虑,可以让KafkaSourceEnumerator忽略状态,然后新发现的topic就都是new,经过initOffset可自定义。 但是,这种情况下,需要让sourceReader部分忽略restore的kafkaSplit (SourceOperator.open方法中的调用),只接受新分配过来的kafkaSplit(SourceOperator.handleAddSplitsEvent方法中的调用),还是需要调整 sourceOperator。 如上,貌似目前没有特别好改动方法,期望能简单改动,因为每次版本升级我都需要实现一次。
Flink状态过期时是否可以将其输出到日志中
Hi, Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。 Best Regards!
Re: flink1.15中kafka source、sink对kafka-client的版本要求是否可降低
Hi, 目前 Kafka connector 会依赖于高版本 kafka-clients 的一些 API,而且 sink 端为支持 exactly-once 语义使用了反射。Flink 社区考虑到 Kafka client 本身保证了较好后向兼容性,因此不再提供使用旧版本 client 的 Kafka connector,针对 5 年前发布的 Kafka 0.11 版本进行适配也不太现实。 祝好, 庆盛 > On Jun 23, 2022, at 19:37, yidan zhao wrote: > > 如题,我想问下相关了解的同学,目前只是升级 kafka-client 新版本,换了下接口用法。还是依赖到部分新版本client才有的功能呢? > 是否有可能基于低版本 kafka-client 实现呢? > > 可以的话我可能自己覆盖实现下。 > 因为高版本kafka-client不支持公司的kafka,公司kafka是开源kafka外层加了一层proxy。使用太高版本kafka > client访问会有问题(推荐的是0.11,我测试最多到2.2的client)。
Re: Flink消费kafka实时同步到MongoDB出现丢数据
Hi, Flink Kafka connector 会在 checkpoint 完成后将位点提交至 Kafka broker,但是 Flink 并不会依赖于提交到 Kafka broker 上的位点做故障恢复,而是使用 checkpoint 中存储的位点恢复。 关于丢失数据个人建议可以先从小数据量开始复现问题,然后从 source 至 sink 再排查。 祝好, 庆盛 > On Jun 26, 2022, at 11:54, casel.chen wrote: > > mysql cdc -> kafka -> mongodb > 写了一个flink > 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git > 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。 > > > flink消费kafka处理数据后提交kafka > offset的流程是怎样的?一开始消费kafka获取到pendingOffsets,如何确保这些pendingOffsets都处理完成然后全部提交呢?有没有这块源码解析资料? >
Re: Flink-1.15.0 消费kafka提交offset失败?
Hi, 这个是 Apache Kafka consumer 的一个已知问题,参见 FLINK-28060 [1] 和 KAFKA-13840 [2]。 [1] https://issues.apache.org/jira/browse/FLINK-28060 [2] https://issues.apache.org/jira/browse/KAFKA-13840 祝好, 庆盛 > On Jun 27, 2022, at 09:16, RS wrote: > > Hi, > 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理? > > > 现象如下: > 1. 任务没有异常, > 2. 数据能正常消费处理,不影响数据使用 > 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset > 4. 部分任务的从Kafka的offset提交失败,部分正常 > > > WARN日志如下: > 2022-06-27 01:07:42,725 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 checkpointing for checkpoint with id=11398 (max part counter=1). > 2022-06-27 01:07:42,830 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 received completion notification for checkpoint with id=11398. > 2022-06-27 01:07:43,820 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 checkpointing for checkpoint with id=11476 (max part counter=0). > 2022-06-27 01:07:43,946 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 received completion notification for checkpoint with id=11476. > 2022-06-27 01:07:45,218 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 checkpointing for checkpoint with id=11521 (max part counter=47). > 2022-06-27 01:07:45,290 INFO > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask > 0 received completion notification for checkpoint with id=11521. > 2022-06-27 01:07:45,521 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 11443 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > 2022-06-27 01:07:45,990 WARN > org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed > to commit consumer offsets for checkpoint 11398 > org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset > commit failed with a retriable exception. You should retry committing the > latest consumed offsets. > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available. > > > Thanks~
Re: Re: lookup join对应task无法正常恢复?
是的,使用的flink sql内置的jdbc connector。 Thanks! Best, Amber Xu 发件人: Lincoln Lee 发送时间: 2022-06-24 21:27 收件人: user-zh 主题: Re: lookup join对应task无法正常恢复? Hi, 请问使用的 mysql 维表是 flink sql 内置的 jdbc connector 吗? 如果是的话,对应内部的 cache 只是读缓存, 并不会持久化, 任务重启或者到达设定的缓存淘汰条件就失效了 如果是自己开发的维表,建议增加相应的数据加载日志, 以便确认 failover 时的处理是否有异常 Best, Lincoln Lee Xuchao 于2022年6月24日周五 17:15写道: > 您好! > 我在使用flink时遇到一些问题。 > flink-1.14.4 > sqlserver-cdc-2.2.1 > yarn-per-job > > 我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点; > sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据; > 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据; > > 以上,可能是什么问题,应该如何解决呢? > > 期待回复! > best wishes! > > 附日志: > 2022-06-24 14:55:45,950 ERROR > com.ververica.cdc.debezium.internal.Handover [] - Reporting > error: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: > An exception occurred in the change event producer. This connector will be > stopped. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[?:1.8.0_301] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_301] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_301] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301] > Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 > cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 > at > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) > ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171) > ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] > ... 7 more > 2022-06-24 14:55:45,953 INFO io.debezium.embedded.EmbeddedEngine > [] - Stopping the embedded engine > 2022-06-24 14:55:45,954 INFO > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl > [] - Source: TableSourceScan(table=[[default_catalog, default_database, > carflow]], fields=[id, plate_license, site_id, create_time, flow_type, > circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, > (create_time + -2880:INTERVAL HOUR) AS c_time, flow_type, circle_id]) > -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 > discarding 0 drained requests > 2022-06-24 14:55:45,955 INFO io.debezium.embedded.EmbeddedEngine > [] - Stopping the embedded engine > 2022-06-24 14:55:45,957 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: TableSourceScan(table=[[default_catalog, > default_database, carflow]], fields=[id, plate_license, site_id, > create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license, > site_id, create_time, (create_time + -2880:INTERVAL HOUR) AS c_time, > flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time], > watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched > from RUNNING to FAILED with failure cause: > com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: > An exception occurred in the cha
Flink-1.15.0 消费kafka提交offset失败?
Hi, 请教下各位,Flink-1.15.0,消费Kafka发现下面个问题,offset提交失败的情况,有的任务应该是一直提交失败的,数据消费了,但是offset不变,这种情况如何处理? 现象如下: 1. 任务没有异常, 2. 数据能正常消费处理,不影响数据使用 3. 任务有配置checkpoint,几分钟一次,理论上执行checkpoint的时候会提交offset 4. 部分任务的从Kafka的offset提交失败,部分正常 WARN日志如下: 2022-06-27 01:07:42,725 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11398 (max part counter=1). 2022-06-27 01:07:42,830 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11398. 2022-06-27 01:07:43,820 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11476 (max part counter=0). 2022-06-27 01:07:43,946 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11476. 2022-06-27 01:07:45,218 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 checkpointing for checkpoint with id=11521 (max part counter=47). 2022-06-27 01:07:45,290 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 received completion notification for checkpoint with id=11521. 2022-06-27 01:07:45,521 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 11443 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. 2022-06-27 01:07:45,990 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 11398 org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available. Thanks~
Re:flink 1.14
Hi, 这边是通过DolphinScheduler来调度的,里面也可以配置job之间的依赖 其他调度系统应该也有类似的功能 Thanks~ 在 2022-04-29 16:03:15,"guanyq" 写道: >咨询下各位大佬 >flink sql在做批处理时,生产环境一般都用什么来做定时调度? >如果存在job之间的依赖,生产环境是又是采用什么来做通知的? > > >我这面主要是想把hive sql 修改为 flink sql
来自杨柳的邮件
退订
退订
退订