[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784752#comment-17784752 ] Darcy Lin commented on FLINK-33484: --- [~martijnvisser] The issue remains the same with flink-connector-kafka:3.0.1-1.17. I checked the implementation of {{{}emitRecord{}}}, and it is still the same. > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784716#comment-17784716 ] Martijn Visser commented on FLINK-33484: [~lintingbin] Can you please use the connector version I've listed above and try again? That's a newer version with more fixes for various bugs > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784526#comment-17784526 ] Darcy Lin commented on FLINK-33484: --- [~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1 > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level
[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784511#comment-17784511 ] Martijn Visser commented on FLINK-33484: [~lintingbin] Please confirm which Flink Kafka connector version you've used. Was it 3.0.*-1.17 ? > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > - > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 >Reporter: Darcy Lin >Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)