[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-10 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784752#comment-17784752
 ] 

Darcy Lin commented on FLINK-33484:
---

[~martijnvisser] The issue remains the same with 
flink-connector-kafka:3.0.1-1.17. I checked the implementation of 
{{{}emitRecord{}}}, and it is still the same.

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-10 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784716#comment-17784716
 ] 

Martijn Visser commented on FLINK-33484:


[~lintingbin] Can you please use the connector version I've listed above and 
try again? That's a newer version with more fixes for various bugs

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-09 Thread Darcy Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784526#comment-17784526
 ] 

Darcy Lin commented on FLINK-33484:
---

[~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-09 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784511#comment-17784511
 ] 

Martijn Visser commented on FLINK-33484:


[~lintingbin] Please confirm which Flink Kafka connector version you've used. 
Was it 3.0.*-1.17 ?

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
>Reporter: Darcy Lin
>Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)