[jira] [Closed] (FLINK-35685) Some metrics in the MetricStore are duplicated when increasing or decreasing task parallelism

2024-06-30 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X closed FLINK-35685.
--
Resolution: Fixed

> Some metrics in the MetricStore are duplicated when increasing or decreasing 
> task parallelism
> -
>
> Key: FLINK-35685
> URL: https://issues.apache.org/jira/browse/FLINK-35685
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: elon_X
>Priority: Major
> Fix For: 1.20.0, 1.19.2
>
> Attachments: image-2024-06-24-18-01-40-869.png
>
>
> 1.This can cause confusion for users.
> 2.As parallelism is continuously adjusted, the data in the MetricStore will 
> increase, occupying more memory.
> !image-2024-06-24-18-01-40-869.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35685) Some metrics in the MetricStore are duplicated when increasing or decreasing task parallelism

2024-06-24 Thread elon_X (Jira)
elon_X created FLINK-35685:
--

 Summary: Some metrics in the MetricStore are duplicated when 
increasing or decreasing task parallelism
 Key: FLINK-35685
 URL: https://issues.apache.org/jira/browse/FLINK-35685
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.19.0
Reporter: elon_X
 Attachments: image-2024-06-24-18-01-40-869.png

1.This can cause confusion for users.
2.As parallelism is continuously adjusted, the data in the MetricStore will 
increase, occupying more memory.

!image-2024-06-24-18-01-40-869.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-18 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856110#comment-17856110
 ] 

elon_X commented on FLINK-35631:


[~robyoung] Ah, I have already fixed this issue. Once it is confirmed to be a 
bug, I will submit my code:(

> KafkaSource parameter partition.discovery.interval.ms with a default value of 
> 5 minutes does not take effect
> 
>
> Key: FLINK-35631
> URL: https://issues.apache.org/jira/browse/FLINK-35631
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: elon_X
>Priority: Major
>
> When I start a stream program to consume Kafka 
> (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
> detect new partitions after Kafka adds partitions.
>  
> *Reason*
> In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
> been overridden. Since I did not set this parameter, even though it is 
> {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets 
> {{{}partition.discovery.interval.ms = -1{}}}.
> In the {{{}KafkaSourceEnumerator{}}}, the value of 
> {{partition.discovery.interval.ms}} is then -1, instead of the default value 
> of 5 minutes, so automatic partition discovery does not work, and the default 
> value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.
>  
> A possible solution is to set {{partition.discovery.interval.ms = -1}} only 
> if {{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-18 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855977#comment-17855977
 ] 

elon_X commented on FLINK-35631:


[~martijnvisser] Thank you for your response. I checked the wiki link you sent, 
and it shows that FLIP-288 has been completed in this issue: 
https://issues.apache.org/jira/browse/FLINK-31953. I reviewed the code in the 
issue, and it appears that dynamic partition discovery is enabled by default. 
So, is this a bug? the reason is as I mentioned above.

> KafkaSource parameter partition.discovery.interval.ms with a default value of 
> 5 minutes does not take effect
> 
>
> Key: FLINK-35631
> URL: https://issues.apache.org/jira/browse/FLINK-35631
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: elon_X
>Priority: Major
>
> When I start a stream program to consume Kafka 
> (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
> detect new partitions after Kafka adds partitions.
>  
> *Reason*
> In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
> been overridden. Since I did not set this parameter, even though it is 
> {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets 
> {{{}partition.discovery.interval.ms = -1{}}}.
> In the {{{}KafkaSourceEnumerator{}}}, the value of 
> {{partition.discovery.interval.ms}} is then -1, instead of the default value 
> of 5 minutes, so automatic partition discovery does not work, and the default 
> value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.
>  
> A possible solution is to set {{partition.discovery.interval.ms = -1}} only 
> if {{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Description: 
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
// KafkaSource
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-17-48-38-080.png!

  was:
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
// KafkaSource
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));
// PulsarSource
PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-18-41-53-593.png!

!image-2024-06-18-17-48-38-080.png!

Summary:  The example provided in the kafkaSource documentation for 
topic regex subscription is incorrect  (was:  The example provided in the 
kafkaSource/pulsarSource documentation for topic regex subscription is 
incorrect)

>  The example provided in the kafkaSource documentation for topic regex 
> subscription is incorrect
> 
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> // KafkaSource
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Description: 
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
// KafkaSource
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));
// PulsarSource
PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-18-41-53-593.png!

!image-2024-06-18-17-48-38-080.png!

  was:
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]

 

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-17-48-38-080.png!


>  The example provided in the kafkaSource/pulsarSource documentation for topic 
> regex subscription is incorrect
> -
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> // KafkaSource
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));
> // PulsarSource
> PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-18-41-53-593.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Attachment: image-2024-06-18-18-41-53-593.png

>  The example provided in the kafkaSource/pulsarSource documentation for topic 
> regex subscription is incorrect
> -
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]
>  
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Summary:  The example provided in the kafkaSource/pulsarSource 
documentation for topic regex subscription is incorrect  (was:  The example 
provided in the kafkaSource documentation for topic regex subscription is 
incorrect)

>  The example provided in the kafkaSource/pulsarSource documentation for topic 
> regex subscription is incorrect
> -
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]
>  
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Attachment: image-2024-06-18-18-41-41-212.png

>  The example provided in the kafkaSource/pulsarSource documentation for topic 
> regex subscription is incorrect
> -
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]
>  
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35632:
---
Description: 
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]

 

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-17-48-38-080.png!

  was:
[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-17-48-38-080.png!


>  The example provided in the kafkaSource documentation for topic regex 
> subscription is incorrect
> 
>
> Key: FLINK-35632
> URL: https://issues.apache.org/jira/browse/FLINK-35632
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.20.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-18-17-47-53-525.png, 
> image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, 
> image-2024-06-18-18-41-53-593.png
>
>
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]
> [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription]
>  
>  
> The example provided in the document has issues and will result in a 
> compilation error. The correct example should be:
> {code:java}
> // code placeholder
> KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
> !image-2024-06-18-17-47-53-525.png!
> !image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect

2024-06-18 Thread elon_X (Jira)
elon_X created FLINK-35632:
--

 Summary:  The example provided in the kafkaSource documentation 
for topic regex subscription is incorrect
 Key: FLINK-35632
 URL: https://issues.apache.org/jira/browse/FLINK-35632
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.20.0
Reporter: elon_X
 Attachments: image-2024-06-18-17-47-53-525.png, 
image-2024-06-18-17-48-38-080.png

[https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription]

 

The example provided in the document has issues and will result in a 
compilation error. The correct example should be:
{code:java}
// code placeholder
KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code}
!image-2024-06-18-17-47-53-525.png!

!image-2024-06-18-17-48-38-080.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35631:
---
Issue Type: Improvement  (was: Bug)

> KafkaSource parameter partition.discovery.interval.ms with a default value of 
> 5 minutes does not take effect
> 
>
> Key: FLINK-35631
> URL: https://issues.apache.org/jira/browse/FLINK-35631
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: elon_X
>Priority: Major
>
> When I start a stream program to consume Kafka 
> (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
> detect new partitions after Kafka adds partitions.
>  
> *Reason*
> In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
> been overridden. Since I did not set this parameter, even though it is 
> {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets 
> {{{}partition.discovery.interval.ms = -1{}}}.
> In the {{{}KafkaSourceEnumerator{}}}, the value of 
> {{partition.discovery.interval.ms}} is then -1, instead of the default value 
> of 5 minutes, so automatic partition discovery does not work, and the default 
> value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.
>  
> A possible solution is to set {{partition.discovery.interval.ms = -1}} only 
> if {{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-18 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35631:
---
Affects Version/s: kafka-3.1.0
   (was: 1.17.2)

> KafkaSource parameter partition.discovery.interval.ms with a default value of 
> 5 minutes does not take effect
> 
>
> Key: FLINK-35631
> URL: https://issues.apache.org/jira/browse/FLINK-35631
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: kafka-3.1.0
>Reporter: elon_X
>Priority: Major
>
> When I start a stream program to consume Kafka 
> (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
> detect new partitions after Kafka adds partitions.
>  
> *Reason*
> In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
> been overridden. Since I did not set this parameter, even though it is 
> {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets 
> {{{}partition.discovery.interval.ms = -1{}}}.
> In the {{{}KafkaSourceEnumerator{}}}, the value of 
> {{partition.discovery.interval.ms}} is then -1, instead of the default value 
> of 5 minutes, so automatic partition discovery does not work, and the default 
> value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.
>  
> A possible solution is to set {{partition.discovery.interval.ms = -1}} only 
> if {{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect

2024-06-18 Thread elon_X (Jira)
elon_X created FLINK-35631:
--

 Summary: KafkaSource parameter partition.discovery.interval.ms 
with a default value of 5 minutes does not take effect
 Key: FLINK-35631
 URL: https://issues.apache.org/jira/browse/FLINK-35631
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.2
Reporter: elon_X


When I start a stream program to consume Kafka 
(flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically 
detect new partitions after Kafka adds partitions.

 

*Reason*

In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has 
been overridden. Since I did not set this parameter, even though it is 
{{{}CONTINUOUS_UNBOUNDED{}}}, it still sets {{{}partition.discovery.interval.ms 
= -1{}}}.

In the {{{}KafkaSourceEnumerator{}}}, the value of 
{{partition.discovery.interval.ms}} is then -1, instead of the default value of 
5 minutes, so automatic partition discovery does not work, and the default 
value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless.

 

A possible solution is to set {{partition.discovery.interval.ms = -1}} only if 
{{boundedness == Boundedness.BOUNDED}} is true.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-06-17 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855611#comment-17855611
 ] 

elon_X commented on FLINK-35157:


Hi [~fanrui] 
I have already backported this fix to 1.17, 1.18, and 1.19. Thank you. :D

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Assignee: elon_X
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-06-17 Thread elon_X (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35157 ]


elon_X deleted comment on FLINK-35157:


was (Author: JIRAUSER303028):
[~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Assignee: elon_X
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-06-13 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854914#comment-17854914
 ] 

elon_X commented on FLINK-35157:


[~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Assignee: elon_X
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-04 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X closed FLINK-35506.
--
Resolution: Not A Problem

> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> ---
>
> Key: FLINK-35506
> URL: https://issues.apache.org/jira/browse/FLINK-35506
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?
>  
> *How to reproduce*
>  
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.auto.commit", "true");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("fetch.min.bytes", "4096");
> kafkaParams.put("sasl.mechanism", "PLAIN");
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
> kafkaParams.put("bootstrap.servers", bootStrap);
> kafkaParams.put("group.id", expoGroupId);
> kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
>  required username=\"" + username + "\" password=\"" + password + "\";");
> KafkaSource source = KafkaSource
> .builder()
> .setBootstrapServers(bootStrap)
> .setProperties(kafkaParams)
> .setGroupId(expoGroupId)
> .setTopics(Arrays.asList(expoTopic))
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.latest())
> .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
> .filter(r ->  true);
> env.enableCheckpointing(3000 * 1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
> env.execute("kafka-consumer"); {code}
>  
>  
> the kafka client's 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously 
> committing offsets.
> !image-2024-06-03-23-39-28-270.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-03 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35506:
---
Description: 
When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?

 

*How to reproduce*

 
{code:java}
// code placeholder
Properties kafkaParams = new Properties();
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("fetch.min.bytes", "4096");
kafkaParams.put("sasl.mechanism", "PLAIN");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("bootstrap.servers", bootStrap);
kafkaParams.put("group.id", expoGroupId);
kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"" + username + "\" password=\"" + password + "\";");

KafkaSource source = KafkaSource
.builder()
.setBootstrapServers(bootStrap)
.setProperties(kafkaParams)
.setGroupId(expoGroupId)
.setTopics(Arrays.asList(expoTopic))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
.filter(r ->  true);

env.enableCheckpointing(3000 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
env.execute("kafka-consumer"); {code}
 

 

the kafka client's 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously 
committing offsets.

!image-2024-06-03-23-39-28-270.png!

  was:
When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?

 

*How to reproduce*

 
{code:java}
// code placeholder
Properties kafkaParams = new Properties();
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("fetch.min.bytes", "4096");
kafkaParams.put("sasl.mechanism", "PLAIN");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("bootstrap.servers", bootStrap);
kafkaParams.put("group.id", expoGroupId);
kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"" + username + "\" password=\"" + password + "\";");

KafkaSource source = KafkaSource
.builder()
.setBootstrapServers(bootStrap)
.setProperties(kafkaParams)
.setGroupId(expoGroupId)
.setTopics(Arrays.asList(expoTopic))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
.filter(r ->  true);

env.enableCheckpointing(3000 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
env.execute("kafka-consumer"); {code}
 

 

'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' continuously 
committing offsets.

!image-2024-06-03-23-39-28-270.png!


> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> ---
>
> Key: FLINK-35506
> URL: https://issues.apache.org/jira/browse/FLINK-35506
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?
>  
> *How to reproduce*
>  
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.aut

[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-03 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35506:
---
Description: 
When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?

 

*How to reproduce*

 
{code:java}
// code placeholder
Properties kafkaParams = new Properties();
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("fetch.min.bytes", "4096");
kafkaParams.put("sasl.mechanism", "PLAIN");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("bootstrap.servers", bootStrap);
kafkaParams.put("group.id", expoGroupId);
kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"" + username + "\" password=\"" + password + "\";");

KafkaSource source = KafkaSource
.builder()
.setBootstrapServers(bootStrap)
.setProperties(kafkaParams)
.setGroupId(expoGroupId)
.setTopics(Arrays.asList(expoTopic))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
.filter(r ->  true);

env.enableCheckpointing(3000 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
env.execute("kafka-consumer"); {code}
 

 

'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' continuously 
committing offsets.

!image-2024-06-03-23-39-28-270.png!

  was:When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?


> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> ---
>
> Key: FLINK-35506
> URL: https://issues.apache.org/jira/browse/FLINK-35506
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?
>  
> *How to reproduce*
>  
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.auto.commit", "true");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("fetch.min.bytes", "4096");
> kafkaParams.put("sasl.mechanism", "PLAIN");
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
> kafkaParams.put("bootstrap.servers", bootStrap);
> kafkaParams.put("group.id", expoGroupId);
> kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
>  required username=\"" + username + "\" password=\"" + password + "\";");
> KafkaSource source = KafkaSource
> .builder()
> .setBootstrapServers(bootStrap)
> .setProperties(kafkaParams)
> .setGroupId(expoGroupId)
> .setTopics(Arrays.asList(expoTopic))
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .setStartingOffsets(OffsetsInitializer.latest())
> .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
> .filter(r ->  true);
> env.enableCheckpointing(3000 * 1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
> env.execute("kafka-consumer"); {code}
>  
>  
> 'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' 
> continuously committing offsets.
> !image-2024-06-03-23-39-28-270.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-03 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35506:
---
Attachment: image-2024-06-03-23-39-28-270.png

> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> ---
>
> Key: FLINK-35506
> URL: https://issues.apache.org/jira/browse/FLINK-35506
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-02 Thread elon_X (Jira)
elon_X created FLINK-35506:
--

 Summary: disable kafka auto-commit and rely on flink’s 
checkpointing if both are enabled
 Key: FLINK-35506
 URL: https://issues.apache.org/jira/browse/FLINK-35506
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: elon_X


When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-04-24 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840443#comment-17840443
 ] 

elon_X commented on FLINK-35157:


hi, [~fanrui] Can you assign this issue to me? I can fix this problem, thank 
you so so much !

!image-2024-04-24-21-36-16-146.png!

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Priority: Critical
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish

2024-04-24 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35157:
---
Attachment: image-2024-04-24-21-36-16-146.png

> Sources with watermark alignment get stuck once some subtasks finish
> 
>
> Key: FLINK-35157
> URL: https://issues.apache.org/jira/browse/FLINK-35157
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Gyula Fora
>Priority: Critical
> Attachments: image-2024-04-24-21-36-16-146.png
>
>
> The current watermark alignment logic can easily get stuck if some subtasks 
> finish while others are still running.
> The reason is that once a source subtask finishes, the subtask is not 
> excluded from alignment, effectively blocking the rest of the job to make 
> progress beyond last wm + alignment time for the finished sources.
> This can be easily reproduced by the following simple pipeline:
> {noformat}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(2);
> DataStream s = env.fromSource(new NumberSequenceSource(0, 100),
> 
> WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner)
>  (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
> Duration.ofSeconds(2)),
> "Sequence Source").filter((FilterFunction) aLong -> {
> Thread.sleep(200);
> return true;
> }
> );
> s.print();
> env.execute();{noformat}
> The solution could be to send out a max watermark event once the sources 
> finish or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-23 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840041#comment-17840041
 ] 

elon_X commented on FLINK-35088:


[~fanrui] I am very happy to fix this issue, please assign it to me, thank you 
very much:D

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
>  ~[flink-dist_2.12-1.16.1.ja

[jira] [Comment Edited] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-22 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839522#comment-17839522
 ] 

elon_X edited comment on FLINK-35178 at 4/22/24 3:20 PM:
-

[~lijinzhong] 

Thank you for your response.

I have been using the default value (true) for the 
"state.checkpoints.create-subdir" parameter. However, when I tested by setting 
this value to false, the result was the same, which might indicate I'm doing 
something wrong ?

Additionally, I've encountered another issue. Even though I set 
{{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions 
are not being discarded even if they are not referenced. Only the checkpoint 
specified by the {{-s}} option (chk-x) is discarded.

As shown in the diagram below, I restored from chk-34, but only chk-34 was 
discarded, while chk-32 and chk-33 continue to exist indefinitely.

!image-2024-04-22-15-16-02-381.png!


was (Author: JIRAUSER303028):
[~lijinzhong] 

Thank you for your response.

I have been using the default value (true) for the 
"state.checkpoints.create-subdir" parameter. However, when I tested by setting 
this value to false, the result was the same, which might indicate I'm doing 
something wrong.

Additionally, I've encountered another issue. Even though I set 
{{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions 
are not being discarded even if they are not referenced. Only the checkpoint 
specified by the {{-s}} option (chk-x) is discarded.

As shown in the diagram below, I restored from chk-34, but only chk-34 was 
discarded, while chk-32 and chk-33 continue to exist indefinitely.

!image-2024-04-22-15-16-02-381.png!

> Checkpoint CLAIM mode does not fully control snapshot ownership
> ---
>
> Key: FLINK-35178
> URL: https://issues.apache.org/jira/browse/FLINK-35178
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-20-14-51-21-062.png, 
> image-2024-04-22-15-16-02-381.png
>
>
> When I enable incremental checkpointing, and the task fails or is canceled 
> for some reason, restarting the task from {{-s checkpoint_path}} with 
> {{restoreMode CLAIM}} allows the Flink job to recover from the last 
> checkpoint, it just discards the previous checkpoint.
> Then I found that this leads to the following two cases:
> 1. If the new checkpoint_x meta file does not reference files in the shared 
> directory under the previous jobID:         
> the shared and taskowned directories from the previous Job will be left as 
> empty directories, and these two directories will persist without being 
> deleted by Flink. !image-2024-04-20-14-51-21-062.png!
> 2. If the new checkpoint_x meta file references files in the shared directory 
> under the previous jobID:
> the chk-(x-1) from the previous job will be discarded, but there will still 
> be state data in the shared directory under that job, which might persist for 
> a relatively long time. Here arises the question: the previous job is no 
> longer running, and it's unclear whether users should delete the state data. 
> Deleting it could lead to errors when the task is restarted, as the meta 
> might reference files that can no longer be found; this could be confusing 
> for users.
>  
> The potential solution might be to reuse the previous job's jobID when 
> restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
> allows users to specify the jobID they want to recover from;
>  
> Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-22 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839522#comment-17839522
 ] 

elon_X commented on FLINK-35178:


[~lijinzhong] 

Thank you for your response.

I have been using the default value (true) for the 
"state.checkpoints.create-subdir" parameter. However, when I tested by setting 
this value to false, the result was the same, which might indicate I'm doing 
something wrong.

Additionally, I've encountered another issue. Even though I set 
{{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions 
are not being discarded even if they are not referenced. Only the checkpoint 
specified by the {{-s}} option (chk-x) is discarded.

As shown in the diagram below, I restored from chk-34, but only chk-34 was 
discarded, while chk-32 and chk-33 continue to exist indefinitely.

!image-2024-04-22-15-16-02-381.png!

> Checkpoint CLAIM mode does not fully control snapshot ownership
> ---
>
> Key: FLINK-35178
> URL: https://issues.apache.org/jira/browse/FLINK-35178
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-20-14-51-21-062.png, 
> image-2024-04-22-15-16-02-381.png
>
>
> When I enable incremental checkpointing, and the task fails or is canceled 
> for some reason, restarting the task from {{-s checkpoint_path}} with 
> {{restoreMode CLAIM}} allows the Flink job to recover from the last 
> checkpoint, it just discards the previous checkpoint.
> Then I found that this leads to the following two cases:
> 1. If the new checkpoint_x meta file does not reference files in the shared 
> directory under the previous jobID:         
> the shared and taskowned directories from the previous Job will be left as 
> empty directories, and these two directories will persist without being 
> deleted by Flink. !image-2024-04-20-14-51-21-062.png!
> 2. If the new checkpoint_x meta file references files in the shared directory 
> under the previous jobID:
> the chk-(x-1) from the previous job will be discarded, but there will still 
> be state data in the shared directory under that job, which might persist for 
> a relatively long time. Here arises the question: the previous job is no 
> longer running, and it's unclear whether users should delete the state data. 
> Deleting it could lead to errors when the task is restarted, as the meta 
> might reference files that can no longer be found; this could be confusing 
> for users.
>  
> The potential solution might be to reuse the previous job's jobID when 
> restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
> allows users to specify the jobID they want to recover from;
>  
> Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-22 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35178:
---
Attachment: image-2024-04-22-15-16-02-381.png

> Checkpoint CLAIM mode does not fully control snapshot ownership
> ---
>
> Key: FLINK-35178
> URL: https://issues.apache.org/jira/browse/FLINK-35178
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-20-14-51-21-062.png, 
> image-2024-04-22-15-16-02-381.png
>
>
> When I enable incremental checkpointing, and the task fails or is canceled 
> for some reason, restarting the task from {{-s checkpoint_path}} with 
> {{restoreMode CLAIM}} allows the Flink job to recover from the last 
> checkpoint, it just discards the previous checkpoint.
> Then I found that this leads to the following two cases:
> 1. If the new checkpoint_x meta file does not reference files in the shared 
> directory under the previous jobID:         
> the shared and taskowned directories from the previous Job will be left as 
> empty directories, and these two directories will persist without being 
> deleted by Flink. !image-2024-04-20-14-51-21-062.png!
> 2. If the new checkpoint_x meta file references files in the shared directory 
> under the previous jobID:
> the chk-(x-1) from the previous job will be discarded, but there will still 
> be state data in the shared directory under that job, which might persist for 
> a relatively long time. Here arises the question: the previous job is no 
> longer running, and it's unclear whether users should delete the state data. 
> Deleting it could lead to errors when the task is restarted, as the meta 
> might reference files that can no longer be found; this could be confusing 
> for users.
>  
> The potential solution might be to reuse the previous job's jobID when 
> restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
> allows users to specify the jobID they want to recover from;
>  
> Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-19 Thread elon_X (Jira)
elon_X created FLINK-35178:
--

 Summary: Checkpoint CLAIM mode does not fully control snapshot 
ownership
 Key: FLINK-35178
 URL: https://issues.apache.org/jira/browse/FLINK-35178
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.18.0
Reporter: elon_X
 Attachments: image-2024-04-20-14-51-21-062.png

When I enable incremental checkpointing, and the task fails or is canceled for 
some reason, restarting the task from {{-s checkpoint_path}} with {{restoreMode 
CLAIM}} allows the Flink job to recover from the last checkpoint, it just 
discards the previous checkpoint.

Then I found that this leads to the following two cases:

1. If the new checkpoint_x meta file does not reference files in the shared 
directory under the previous jobID:         

the shared and taskowned directories from the previous Job will be left as 
empty directories, and these two directories will persist without being deleted 
by Flink. !image-2024-04-20-14-51-21-062.png!

2. If the new checkpoint_x meta file references files in the shared directory 
under the previous jobID:

the chk-(x-1) from the previous job will be discarded, but there will still be 
state data in the shared directory under that job, which might persist for a 
relatively long time. Here arises the question: the previous job is no longer 
running, and it's unclear whether users should delete the state data. Deleting 
it could lead to errors when the task is restarted, as the meta might reference 
files that can no longer be found; this could be confusing for users.

 

The potential solution might be to reuse the previous job's jobID when 
restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
allows users to specify the jobID they want to recover from;

 

Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks

2024-04-19 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35160:
---
 Attachment: image-2024-04-20-14-43-36-939.png
Description: 
After receiving feedback from the business side about performance issues in 
their tasks, we attempted to troubleshoot and discovered that their tasks had 
issues with thread deadlocks. However, the Thread Dump entry on the Flink page 
only shows thread stacks. Since the users are not very familiar with Java 
stacks, they couldn't clearly identify that the deadlocks were due to issues in 
the business logic code and mistakenly thought they were problems with the 
Flink framework

!image-2024-04-18-20-57-52-440.png!

!image-2024-04-18-20-58-09-872.png!

the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
the business team does not have the permissions to log into the machines.  hear 
is the jstack log

!image-2024-04-20-14-43-36-939.png!

FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
in application profiling but are not designed to pinpoint the exact lines of 
code where thread deadlocks occur.

!image-2024-04-18-21-01-22-881.png!

Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
similar to what the {{jstack}} command provides.

 

!image-2024-04-18-21-34-41-014.png!

  was:
After receiving feedback from the business side about performance issues in 
their tasks, we attempted to troubleshoot and discovered that their tasks had 
issues with thread deadlocks. However, the Thread Dump entry on the Flink page 
only shows thread stacks. Since the users are not very familiar with Java 
stacks, they couldn't clearly identify that the deadlocks were due to issues in 
the business logic code and mistakenly thought they were problems with the 
Flink framework

!image-2024-04-18-20-57-52-440.png!

!image-2024-04-18-20-58-09-872.png!

the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
the business team does not have the permissions to log into the machines.  hear 
is the jstack log

!image-2024-04-18-21-00-04-532.png!

FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
in application profiling but are not designed to pinpoint the exact lines of 
code where thread deadlocks occur.

!image-2024-04-18-21-01-22-881.png!

Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
similar to what the {{jstack}} command provides.

 

!image-2024-04-18-21-34-41-014.png!


> Support for Thread Dump provides a convenient way to display issues of thread 
> deadlocks in tasks
> 
>
> Key: FLINK-35160
> URL: https://issues.apache.org/jira/browse/FLINK-35160
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-18-20-57-52-440.png, 
> image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, 
> image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png
>
>
> After receiving feedback from the business side about performance issues in 
> their tasks, we attempted to troubleshoot and discovered that their tasks had 
> issues with thread deadlocks. However, the Thread Dump entry on the Flink 
> page only shows thread stacks. Since the users are not very familiar with 
> Java stacks, they couldn't clearly identify that the deadlocks were due to 
> issues in the business logic code and mistakenly thought they were problems 
> with the Flink framework
> !image-2024-04-18-20-57-52-440.png!
> !image-2024-04-18-20-58-09-872.png!
> the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
> the business team does not have the permissions to log into the machines.  
> hear is the jstack log
> !image-2024-04-20-14-43-36-939.png!
> FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
> in application profiling but are not designed to pinpoint the exact lines of 
> code where thread deadlocks occur.
> !image-2024-04-18-21-01-22-881.png!
> Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
> similar to what the {{jstack}} command provides.
>  
> !image-2024-04-18-21-34-41-014.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks

2024-04-19 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35160:
---
Attachment: (was: image-2024-04-18-21-00-04-532.png)

> Support for Thread Dump provides a convenient way to display issues of thread 
> deadlocks in tasks
> 
>
> Key: FLINK-35160
> URL: https://issues.apache.org/jira/browse/FLINK-35160
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-18-20-57-52-440.png, 
> image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, 
> image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png
>
>
> After receiving feedback from the business side about performance issues in 
> their tasks, we attempted to troubleshoot and discovered that their tasks had 
> issues with thread deadlocks. However, the Thread Dump entry on the Flink 
> page only shows thread stacks. Since the users are not very familiar with 
> Java stacks, they couldn't clearly identify that the deadlocks were due to 
> issues in the business logic code and mistakenly thought they were problems 
> with the Flink framework
> !image-2024-04-18-20-57-52-440.png!
> !image-2024-04-18-20-58-09-872.png!
> the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
> the business team does not have the permissions to log into the machines.  
> hear is the jstack log
> !image-2024-04-18-21-00-04-532.png!
> FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
> in application profiling but are not designed to pinpoint the exact lines of 
> code where thread deadlocks occur.
> !image-2024-04-18-21-01-22-881.png!
> Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
> similar to what the {{jstack}} command provides.
>  
> !image-2024-04-18-21-34-41-014.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks

2024-04-18 Thread elon_X (Jira)
elon_X created FLINK-35160:
--

 Summary: Support for Thread Dump provides a convenient way to 
display issues of thread deadlocks in tasks
 Key: FLINK-35160
 URL: https://issues.apache.org/jira/browse/FLINK-35160
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.18.1, 1.19.0, 1.17.1, 1.16.0
Reporter: elon_X
 Attachments: image-2024-04-18-20-57-52-440.png, 
image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-00-04-532.png, 
image-2024-04-18-21-01-22-881.png, image-2024-04-18-21-34-41-014.png

After receiving feedback from the business side about performance issues in 
their tasks, we attempted to troubleshoot and discovered that their tasks had 
issues with thread deadlocks. However, the Thread Dump entry on the Flink page 
only shows thread stacks. Since the users are not very familiar with Java 
stacks, they couldn't clearly identify that the deadlocks were due to issues in 
the business logic code and mistakenly thought they were problems with the 
Flink framework

!image-2024-04-18-20-57-52-440.png!

!image-2024-04-18-20-58-09-872.png!

the JVM's jstack command can clearly display thread deadlocks, unfortunately, 
the business team does not have the permissions to log into the machines.  hear 
is the jstack log

!image-2024-04-18-21-00-04-532.png!

FlameGraph are excellent for visualizing performance bottlenecks and hotspots 
in application profiling but are not designed to pinpoint the exact lines of 
code where thread deadlocks occur.

!image-2024-04-18-21-01-22-881.png!

Perhaps we could enhance the Thread Dump feature to display thread deadlocks, 
similar to what the {{jstack}} command provides.

 

!image-2024-04-18-21-34-41-014.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-18 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838534#comment-17838534
 ] 

elon_X commented on FLINK-35088:


[~martijnvisser] [~masc]  I'm sorry for the late reply. I have conducted a 
retest based on Flink version 1.18 and found that the problem still persists. 
Then, I checked the latest code on the main Flink branch and found that there 
is no validation for these two parameters. What are your thoughts on this?

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGr

[jira] [Comment Edited] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238
 ] 

elon_X edited comment on FLINK-35076 at 4/11/24 3:23 PM:
-

[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance. I'm not quite sure 
how to shuffle the data source before {{{}fromSource{}}}.


was (Author: JIRAUSER303028):
[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance.

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-11 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238
 ] 

elon_X commented on FLINK-35076:


[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source source, WatermarkStrategy 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance.

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-11 Thread elon_X (Jira)
elon_X created FLINK-35088:
--

 Summary: watermark alignment maxAllowedWatermarkDrift and 
updateInterval param need check
 Key: FLINK-35088
 URL: https://issues.apache.org/jira/browse/FLINK-35088
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, Runtime / Coordination
Affects Versions: 1.16.1
Reporter: elon_X
 Attachments: image-2024-04-11-20-12-29-951.png

When I use watermark alignment,

1.I found that setting maxAllowedWatermarkDrift to a negative number initially 
led me to believe it could support delaying the consumption of the source, so I 
tried it. Then, the upstream data flow would hang indefinitely.

Root cause:
{code:java}
long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()               
  + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < 
lastEmittedWatermark, then the SourceReader will be blocked indefinitely and 
cannot recover.

I'm not sure if this is a supported feature of watermark alignment. If it's 
not, I think an additional parameter validation should be implemented to throw 
an exception on the client side if the value is negative.

2.The updateInterval parameter also lacks validation. If I set it to 0, the 
task will throw an exception when starting the job manager. The JDK class 
java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
throws the exception.
{code:java}
java.lang.IllegalArgumentException: null
at 
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
 ~[?:1.8.0_351]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
 ~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:208) 
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at 
org.

[jira] [Commented] (FLINK-35064) Flink sql connector pulsar/hive com.fasterxml.jackson.annotation.JsonFormat$Value conflict

2024-04-10 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835964#comment-17835964
 ] 

elon_X commented on FLINK-35064:


[~chalixar]

Thank you for your reply. I also used relocation to solve this problem, and I 
am very willing to upgrade for pulsar. My question is, do other connectors also 
need relocation, or is it just for handling flink-connector-pulsar?

> Flink sql connector pulsar/hive 
> com.fasterxml.jackson.annotation.JsonFormat$Value conflict
> --
>
> Key: FLINK-35064
> URL: https://issues.apache.org/jira/browse/FLINK-35064
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive, Connectors / Pulsar
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
>
> When I compile and package {{flink-sql-connector-pulsar}} & 
> {{{}flink-sql-connector-hive{}}}, and then put these two jar files into the 
> Flink lib directory, I execute the following SQL statement through 
> {{{}bin/sql-client.sh{}}}:
>  
> {code:java}
> // code placeholder
> CREATE TABLE
> pulsar_table (
> content string,
> proc_time AS PROCTIME ()
> )
> WITH
> (
> 'connector' = 'pulsar',
> 'topics' = 'persistent://xxx',
> 'service-url' = 'pulsar://xxx',
> 'source.subscription-name' = 'xxx',
> 'source.start.message-id' = 'latest',
> 'format' = 'csv',
> 'pulsar.client.authPluginClassName' = 
> 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
> 'pulsar.client.authParams' = 'token:xxx'
> );
>  
> select * from pulsar_table; {code}
> The task error exception stack is as follows:
>  
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.annotation.JsonFormat$Value.empty()Lcom/fasterxml/jackson/annotation/JsonFormat$Value;
> at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.cfg.MapperConfig.(MapperConfig.java:56)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:660)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:576)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.createObjectMapperInstance(ObjectMapperFactory.java:151)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.common.util.ObjectMapperFactory.(ObjectMapperFactory.java:142)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.create(ConfigurationDataUtils.java:35)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.(ConfigurationDataUtils.java:43)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.pulsar.client.impl.ClientBuilderImpl.loadConf(ClientBuilderImpl.java:77)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:105)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:95)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:76)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.connector.pulsar.source.PulsarSource.createEnumerator(PulsarSource.java:144)
>  ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
> {code}
>  
> The exception shows a conflict with 
> {{{}com.fasterxml.jackson.annotation.JsonFormat$Value{}}}. I investigated and 
> found that {{flink-sql-connector-pulsar}} and {{flink-sql-connector-hive}} 
> depend on different versions, leading to this conflict.
> {code:java}
> // flink-sql-connector-pulsar pom.xml
> 
>     com.fasterxml.jackson
>     jackson-bom
>     pom
>     import
>     2.13.4.20221013
>  
> // flink-sql-connector-hive pom.xml
> 
> com.fasterxml.jackson
> jackson-bom
> pom
> import
> 2.15.3
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Description: 
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

!image-2024-04-10-20-29-13-835.png!

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward. until that minimum value is updated, only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 

  was:
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

!image-2024-04-10-20-29-13-835.png!

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 


> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience se

[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Attachment: image-2024-04-10-20-29-13-835.png

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-25-59-387.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward, until that minimum value is 
> updated. Only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Description: 
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

!image-2024-04-10-20-29-13-835.png!

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 

  was:
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

!image-2024-04-10-20-25-59-387.png!

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 


> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience se

[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Attachment: image-2024-04-10-20-25-59-387.png

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
>  
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward, until that minimum value is 
> updated. Only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Description: 
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

!image-2024-04-10-20-25-59-387.png!

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 

  was:
In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

 

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 


> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-25-59-387.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-2

[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35076:
---
Attachment: (was: image-2024-04-10-20-13-14-752.png)

> Watermark alignment will cause data flow to experience serious shake
> 
>
> Key: FLINK-35076
> URL: https://issues.apache.org/jira/browse/FLINK-35076
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
>  
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward, until that minimum value is 
> updated. Only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake

2024-04-10 Thread elon_X (Jira)
elon_X created FLINK-35076:
--

 Summary: Watermark alignment will cause data flow to experience 
serious shake
 Key: FLINK-35076
 URL: https://issues.apache.org/jira/browse/FLINK-35076
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.16.1
Reporter: elon_X
 Attachments: image-2024-04-10-20-13-14-752.png, 
image-2024-04-10-20-15-05-731.png, image-2024-04-10-20-23-13-872.png

In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

 

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)