[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34470:
---
Labels: pull-request-available  (was: )

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: dongwoo.kim
>Priority: Major
>  Labels: pull-request-available
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-04-15 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-34470:
---
Affects Version/s: kafka-3.1.0
   (was: 1.17.1)

> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce this unexpected 
> behavior by following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code, split is finished only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> {code:java}
> if (consumer.position(tp) >= stoppingOffset) {
> recordsBySplits.setPartitionStoppingOffset(tp, 
> stoppingOffset);
> finishSplitAtRecord(
> tp,
> stoppingOffset,
> lastRecord.offset(),
> finishedPartitions,
> recordsBySplits);
> }
> {code}
> Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
> [here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
>  can solve the problem. 
> *consumer.position(tp)* gets next record's offset if it exist and the last 
> record's offset if the next record doesn't exist. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be happy to implement about this fix if we can reach on agreement. 
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code, split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 



I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-20 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's request 
timeouts after hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code split is finished only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
{code:java}
if (consumer.position(tp) >= stoppingOffset) {
recordsBySplits.setPartitionStoppingOffset(tp, 
stoppingOffset);
finishSplitAtRecord(
tp,
stoppingOffset,
lastRecord.offset(),
finishedPartitions,
recordsBySplits);
}
{code}
Replacing if condition to *consumer.position(tp) >= stoppingOffset* in 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
*consumer.position(tp)* gets next record's offset if it exist and the last 
record's offset if the next record doesn't exist. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql's 
> request timeouts after hanging. We can always reproduce 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement 
[here|https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L137]
 can solve the problem. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix if we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2. How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2. How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= 

[jira] [Updated] (FLINK-34470) Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging

2024-02-19 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-34470:

Description: 
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.
This is related to this 
[issue|https://issues.apache.org/jira/browse/FLINK-33484] too.

h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as getting count of the produced messages
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these control messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 

I would be happy to implement about this fix it we can reach on agreement. 
Thanks

  was:
h2. Summary  
Hi we have faced issue with transactional message and table api kafka source. 
If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
timeouts after  hanging. We can always reproduce this unexpected behavior by 
following below steps.


h2.  How to reproduce
1. Deploy transactional producer and stop after producing certain amount of 
messages
2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple query 
such as count(*)
3. Flink sql job gets stucked and timeouts.

h2. Cause
Transaction producer always produces [control 
records|https://kafka.apache.org/documentation/#controlbatch] at the end of the 
transaction. And these controll messages are not polled by 
{*}consumer.poll(){*}. (It is filtered internally). In 
*KafkaPartitionSplitReader* code it finishes split only when 
*lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
well with non transactional messages or streaming environment but in some batch 
use cases it causes unexpected hanging.

h2. Proposed solution
Adding *consumer.position(tp) >= stoppingOffset* condition to the if statement. 
By this KafkaPartitionSplitReader is available to finish the split even when 
the stopping offset is configured to control record's offset. 


> Transactional message + Table api kafka source with 'latest-offset' scan 
> bound mode causes indefinitely hanging
> ---
>
> Key: FLINK-34470
> URL: https://issues.apache.org/jira/browse/FLINK-34470
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.17.1
>Reporter: dongwoo.kim
>Priority: Major
>
> h2. Summary  
> Hi we have faced issue with transactional message and table api kafka source. 
> If we configure *'scan.bounded.mode'* to *'latest-offset'* flink sql request 
> timeouts after  hanging. We can always reproduce this unexpected behavior by 
> following below steps.
> This is related to this 
> [issue|https://issues.apache.org/jira/browse/FLINK-33484] too.
> h2.  How to reproduce
> 1. Deploy transactional producer and stop after producing certain amount of 
> messages
> 2. Configure *'scan.bounded.mode'* to *'latest-offset'* and submit simple 
> query such as getting count of the produced messages
> 3. Flink sql job gets stucked and timeouts.
> h2. Cause
> Transaction producer always produces [control 
> records|https://kafka.apache.org/documentation/#controlbatch] at the end of 
> the transaction. And these control messages are not polled by 
> {*}consumer.poll(){*}. (It is filtered internally). In 
> *KafkaPartitionSplitReader* code it finishes split only when 
> *lastRecord.offset() >= stoppingOffset - 1* condition is met. This might work 
> well with non transactional messages or streaming environment but in some 
> batch use cases it causes unexpected hanging.
> h2. Proposed solution
> Adding *consumer.position(tp) >= stoppingOffset* condition to the if 
> statement. 
> By this KafkaPartitionSplitReader is available to finish the split even when 
> the stopping offset is configured to control record's offset. 
> I would be