[jira] [Closed] (FLINK-35685) Some metrics in the MetricStore are duplicated when increasing or decreasing task parallelism
[ https://issues.apache.org/jira/browse/FLINK-35685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X closed FLINK-35685. -- Resolution: Fixed > Some metrics in the MetricStore are duplicated when increasing or decreasing > task parallelism > - > > Key: FLINK-35685 > URL: https://issues.apache.org/jira/browse/FLINK-35685 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: elon_X >Priority: Major > Fix For: 1.20.0, 1.19.2 > > Attachments: image-2024-06-24-18-01-40-869.png > > > 1.This can cause confusion for users. > 2.As parallelism is continuously adjusted, the data in the MetricStore will > increase, occupying more memory. > !image-2024-06-24-18-01-40-869.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35685) Some metrics in the MetricStore are duplicated when increasing or decreasing task parallelism
elon_X created FLINK-35685: -- Summary: Some metrics in the MetricStore are duplicated when increasing or decreasing task parallelism Key: FLINK-35685 URL: https://issues.apache.org/jira/browse/FLINK-35685 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.19.0 Reporter: elon_X Attachments: image-2024-06-24-18-01-40-869.png 1.This can cause confusion for users. 2.As parallelism is continuously adjusted, the data in the MetricStore will increase, occupying more memory. !image-2024-06-24-18-01-40-869.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
[ https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856110#comment-17856110 ] elon_X commented on FLINK-35631: [~robyoung] Ah, I have already fixed this issue. Once it is confirmed to be a bug, I will submit my code:( > KafkaSource parameter partition.discovery.interval.ms with a default value of > 5 minutes does not take effect > > > Key: FLINK-35631 > URL: https://issues.apache.org/jira/browse/FLINK-35631 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: elon_X >Priority: Major > > When I start a stream program to consume Kafka > (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically > detect new partitions after Kafka adds partitions. > > *Reason* > In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has > been overridden. Since I did not set this parameter, even though it is > {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets > {{{}partition.discovery.interval.ms = -1{}}}. > In the {{{}KafkaSourceEnumerator{}}}, the value of > {{partition.discovery.interval.ms}} is then -1, instead of the default value > of 5 minutes, so automatic partition discovery does not work, and the default > value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. > > A possible solution is to set {{partition.discovery.interval.ms = -1}} only > if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
[ https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855977#comment-17855977 ] elon_X commented on FLINK-35631: [~martijnvisser] Thank you for your response. I checked the wiki link you sent, and it shows that FLIP-288 has been completed in this issue: https://issues.apache.org/jira/browse/FLINK-31953. I reviewed the code in the issue, and it appears that dynamic partition discovery is enabled by default. So, is this a bug? the reason is as I mentioned above. > KafkaSource parameter partition.discovery.interval.ms with a default value of > 5 minutes does not take effect > > > Key: FLINK-35631 > URL: https://issues.apache.org/jira/browse/FLINK-35631 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: elon_X >Priority: Major > > When I start a stream program to consume Kafka > (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically > detect new partitions after Kafka adds partitions. > > *Reason* > In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has > been overridden. Since I did not set this parameter, even though it is > {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets > {{{}partition.discovery.interval.ms = -1{}}}. > In the {{{}KafkaSourceEnumerator{}}}, the value of > {{partition.discovery.interval.ms}} is then -1, instead of the default value > of 5 minutes, so automatic partition discovery does not work, and the default > value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. > > A possible solution is to set {{partition.discovery.interval.ms = -1}} only > if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Description: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder // KafkaSource KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-17-48-38-080.png! was: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder // KafkaSource KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")); // PulsarSource PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-18-41-53-593.png! !image-2024-06-18-17-48-38-080.png! Summary: The example provided in the kafkaSource documentation for topic regex subscription is incorrect (was: The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect) > The example provided in the kafkaSource documentation for topic regex > subscription is incorrect > > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Labels: pull-request-available > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > // KafkaSource > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Description: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder // KafkaSource KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")); // PulsarSource PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-18-41-53-593.png! !image-2024-06-18-17-48-38-080.png! was: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-17-48-38-080.png! > The example provided in the kafkaSource/pulsarSource documentation for topic > regex subscription is incorrect > - > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > // KafkaSource > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")); > // PulsarSource > PulsarSource.builder().setTopicPattern(Pattern.compile("topic.*"));{code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-18-41-53-593.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Attachment: image-2024-06-18-18-41-53-593.png > The example provided in the kafkaSource/pulsarSource documentation for topic > regex subscription is incorrect > - > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] > > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Summary: The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect (was: The example provided in the kafkaSource documentation for topic regex subscription is incorrect) > The example provided in the kafkaSource/pulsarSource documentation for topic > regex subscription is incorrect > - > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] > > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource/pulsarSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Attachment: image-2024-06-18-18-41-41-212.png > The example provided in the kafkaSource/pulsarSource documentation for topic > regex subscription is incorrect > - > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] > > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect
[ https://issues.apache.org/jira/browse/FLINK-35632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35632: --- Description: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-17-48-38-080.png! was: [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-17-48-38-080.png! > The example provided in the kafkaSource documentation for topic regex > subscription is incorrect > > > Key: FLINK-35632 > URL: https://issues.apache.org/jira/browse/FLINK-35632 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.20.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-18-17-47-53-525.png, > image-2024-06-18-17-48-38-080.png, image-2024-06-18-18-41-41-212.png, > image-2024-06-18-18-41-53-593.png > > > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] > [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/pulsar/#topic-partition-subscription] > > > The example provided in the document has issues and will result in a > compilation error. The correct example should be: > {code:java} > // code placeholder > KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} > !image-2024-06-18-17-47-53-525.png! > !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35632) The example provided in the kafkaSource documentation for topic regex subscription is incorrect
elon_X created FLINK-35632: -- Summary: The example provided in the kafkaSource documentation for topic regex subscription is incorrect Key: FLINK-35632 URL: https://issues.apache.org/jira/browse/FLINK-35632 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.20.0 Reporter: elon_X Attachments: image-2024-06-18-17-47-53-525.png, image-2024-06-18-17-48-38-080.png [https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#topic-partition-subscription] The example provided in the document has issues and will result in a compilation error. The correct example should be: {code:java} // code placeholder KafkaSource.builder().setTopicPattern(Pattern.compile("topic.*")) {code} !image-2024-06-18-17-47-53-525.png! !image-2024-06-18-17-48-38-080.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
[ https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35631: --- Issue Type: Improvement (was: Bug) > KafkaSource parameter partition.discovery.interval.ms with a default value of > 5 minutes does not take effect > > > Key: FLINK-35631 > URL: https://issues.apache.org/jira/browse/FLINK-35631 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: elon_X >Priority: Major > > When I start a stream program to consume Kafka > (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically > detect new partitions after Kafka adds partitions. > > *Reason* > In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has > been overridden. Since I did not set this parameter, even though it is > {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets > {{{}partition.discovery.interval.ms = -1{}}}. > In the {{{}KafkaSourceEnumerator{}}}, the value of > {{partition.discovery.interval.ms}} is then -1, instead of the default value > of 5 minutes, so automatic partition discovery does not work, and the default > value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. > > A possible solution is to set {{partition.discovery.interval.ms = -1}} only > if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
[ https://issues.apache.org/jira/browse/FLINK-35631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35631: --- Affects Version/s: kafka-3.1.0 (was: 1.17.2) > KafkaSource parameter partition.discovery.interval.ms with a default value of > 5 minutes does not take effect > > > Key: FLINK-35631 > URL: https://issues.apache.org/jira/browse/FLINK-35631 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: kafka-3.1.0 >Reporter: elon_X >Priority: Major > > When I start a stream program to consume Kafka > (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically > detect new partitions after Kafka adds partitions. > > *Reason* > In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has > been overridden. Since I did not set this parameter, even though it is > {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets > {{{}partition.discovery.interval.ms = -1{}}}. > In the {{{}KafkaSourceEnumerator{}}}, the value of > {{partition.discovery.interval.ms}} is then -1, instead of the default value > of 5 minutes, so automatic partition discovery does not work, and the default > value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. > > A possible solution is to set {{partition.discovery.interval.ms = -1}} only > if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35631) KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect
elon_X created FLINK-35631: -- Summary: KafkaSource parameter partition.discovery.interval.ms with a default value of 5 minutes does not take effect Key: FLINK-35631 URL: https://issues.apache.org/jira/browse/FLINK-35631 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.2 Reporter: elon_X When I start a stream program to consume Kafka (flink-connector-kafka-3.1-SNAPSHOT) the Flink task does not automatically detect new partitions after Kafka adds partitions. *Reason* In the {{{}KafkaSourceBuilder{}}}, this parameter is checked to see if it has been overridden. Since I did not set this parameter, even though it is {{{}CONTINUOUS_UNBOUNDED{}}}, it still sets {{{}partition.discovery.interval.ms = -1{}}}. In the {{{}KafkaSourceEnumerator{}}}, the value of {{partition.discovery.interval.ms}} is then -1, instead of the default value of 5 minutes, so automatic partition discovery does not work, and the default value of 5 minutes for {{partition.discovery.interval.ms}} is meaningless. A possible solution is to set {{partition.discovery.interval.ms = -1}} only if {{boundedness == Boundedness.BOUNDED}} is true. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
[ https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855611#comment-17855611 ] elon_X commented on FLINK-35157: Hi [~fanrui] I have already backported this fix to 1.17, 1.18, and 1.19. Thank you. :D > Sources with watermark alignment get stuck once some subtasks finish > > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Gyula Fora >Assignee: elon_X >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
[ https://issues.apache.org/jira/browse/FLINK-35157 ] elon_X deleted comment on FLINK-35157: was (Author: JIRAUSER303028): [~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D > Sources with watermark alignment get stuck once some subtasks finish > > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Gyula Fora >Assignee: elon_X >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
[ https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854914#comment-17854914 ] elon_X commented on FLINK-35157: [~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D > Sources with watermark alignment get stuck once some subtasks finish > > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Gyula Fora >Assignee: elon_X >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
[ https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X closed FLINK-35506. -- Resolution: Not A Problem > disable kafka auto-commit and rely on flink’s checkpointing if both are > enabled > --- > > Key: FLINK-35506 > URL: https://issues.apache.org/jira/browse/FLINK-35506 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-03-23-39-28-270.png > > > When I use KafkaSource for consuming topics and set the Kafka parameter > {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the > task, I notice that both will commit offsets. Should Kafka's auto-commit be > disabled when enabling Flink checkpointing, similar to how it's done with > FlinkKafkaConsumer? > > *How to reproduce* > > {code:java} > // code placeholder > Properties kafkaParams = new Properties(); > kafkaParams.put("enable.auto.commit", "true"); > kafkaParams.put("auto.offset.reset", "latest"); > kafkaParams.put("fetch.min.bytes", "4096"); > kafkaParams.put("sasl.mechanism", "PLAIN"); > kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); > kafkaParams.put("bootstrap.servers", bootStrap); > kafkaParams.put("group.id", expoGroupId); > kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"" + username + "\" password=\"" + password + "\";"); > KafkaSource source = KafkaSource > .builder() > .setBootstrapServers(bootStrap) > .setProperties(kafkaParams) > .setGroupId(expoGroupId) > .setTopics(Arrays.asList(expoTopic)) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.latest()) > .build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") > .filter(r -> true); > env.enableCheckpointing(3000 * 1000); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); > env.execute("kafka-consumer"); {code} > > > the kafka client's > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously > committing offsets. > !image-2024-06-03-23-39-28-270.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
[ https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35506: --- Description: When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? *How to reproduce* {code:java} // code placeholder Properties kafkaParams = new Properties(); kafkaParams.put("enable.auto.commit", "true"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("fetch.min.bytes", "4096"); kafkaParams.put("sasl.mechanism", "PLAIN"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("bootstrap.servers", bootStrap); kafkaParams.put("group.id", expoGroupId); kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); KafkaSource source = KafkaSource .builder() .setBootstrapServers(bootStrap) .setProperties(kafkaParams) .setGroupId(expoGroupId) .setTopics(Arrays.asList(expoTopic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") .filter(r -> true); env.enableCheckpointing(3000 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); env.execute("kafka-consumer"); {code} the kafka client's org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously committing offsets. !image-2024-06-03-23-39-28-270.png! was: When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? *How to reproduce* {code:java} // code placeholder Properties kafkaParams = new Properties(); kafkaParams.put("enable.auto.commit", "true"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("fetch.min.bytes", "4096"); kafkaParams.put("sasl.mechanism", "PLAIN"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("bootstrap.servers", bootStrap); kafkaParams.put("group.id", expoGroupId); kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); KafkaSource source = KafkaSource .builder() .setBootstrapServers(bootStrap) .setProperties(kafkaParams) .setGroupId(expoGroupId) .setTopics(Arrays.asList(expoTopic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") .filter(r -> true); env.enableCheckpointing(3000 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); env.execute("kafka-consumer"); {code} 'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' continuously committing offsets. !image-2024-06-03-23-39-28-270.png! > disable kafka auto-commit and rely on flink’s checkpointing if both are > enabled > --- > > Key: FLINK-35506 > URL: https://issues.apache.org/jira/browse/FLINK-35506 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-03-23-39-28-270.png > > > When I use KafkaSource for consuming topics and set the Kafka parameter > {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the > task, I notice that both will commit offsets. Should Kafka's auto-commit be > disabled when enabling Flink checkpointing, similar to how it's done with > FlinkKafkaConsumer? > > *How to reproduce* > > {code:java} > // code placeholder > Properties kafkaParams = new Properties(); > kafkaParams.put("enable.aut
[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
[ https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35506: --- Description: When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? *How to reproduce* {code:java} // code placeholder Properties kafkaParams = new Properties(); kafkaParams.put("enable.auto.commit", "true"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("fetch.min.bytes", "4096"); kafkaParams.put("sasl.mechanism", "PLAIN"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("bootstrap.servers", bootStrap); kafkaParams.put("group.id", expoGroupId); kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); KafkaSource source = KafkaSource .builder() .setBootstrapServers(bootStrap) .setProperties(kafkaParams) .setGroupId(expoGroupId) .setTopics(Arrays.asList(expoTopic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") .filter(r -> true); env.enableCheckpointing(3000 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); env.execute("kafka-consumer"); {code} 'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' continuously committing offsets. !image-2024-06-03-23-39-28-270.png! was:When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? > disable kafka auto-commit and rely on flink’s checkpointing if both are > enabled > --- > > Key: FLINK-35506 > URL: https://issues.apache.org/jira/browse/FLINK-35506 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-03-23-39-28-270.png > > > When I use KafkaSource for consuming topics and set the Kafka parameter > {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the > task, I notice that both will commit offsets. Should Kafka's auto-commit be > disabled when enabling Flink checkpointing, similar to how it's done with > FlinkKafkaConsumer? > > *How to reproduce* > > {code:java} > // code placeholder > Properties kafkaParams = new Properties(); > kafkaParams.put("enable.auto.commit", "true"); > kafkaParams.put("auto.offset.reset", "latest"); > kafkaParams.put("fetch.min.bytes", "4096"); > kafkaParams.put("sasl.mechanism", "PLAIN"); > kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); > kafkaParams.put("bootstrap.servers", bootStrap); > kafkaParams.put("group.id", expoGroupId); > kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"" + username + "\" password=\"" + password + "\";"); > KafkaSource source = KafkaSource > .builder() > .setBootstrapServers(bootStrap) > .setProperties(kafkaParams) > .setGroupId(expoGroupId) > .setTopics(Arrays.asList(expoTopic)) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.latest()) > .build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") > .filter(r -> true); > env.enableCheckpointing(3000 * 1000); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); > env.execute("kafka-consumer"); {code} > > > 'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' > continuously committing offsets. > !image-2024-06-03-23-39-28-270.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
[ https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35506: --- Attachment: image-2024-06-03-23-39-28-270.png > disable kafka auto-commit and rely on flink’s checkpointing if both are > enabled > --- > > Key: FLINK-35506 > URL: https://issues.apache.org/jira/browse/FLINK-35506 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-06-03-23-39-28-270.png > > > When I use KafkaSource for consuming topics and set the Kafka parameter > {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the > task, I notice that both will commit offsets. Should Kafka's auto-commit be > disabled when enabling Flink checkpointing, similar to how it's done with > FlinkKafkaConsumer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled
elon_X created FLINK-35506: -- Summary: disable kafka auto-commit and rely on flink’s checkpointing if both are enabled Key: FLINK-35506 URL: https://issues.apache.org/jira/browse/FLINK-35506 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: elon_X When I use KafkaSource for consuming topics and set the Kafka parameter {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
[ https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840443#comment-17840443 ] elon_X commented on FLINK-35157: hi, [~fanrui] Can you assign this issue to me? I can fix this problem, thank you so so much ! !image-2024-04-24-21-36-16-146.png! > Sources with watermark alignment get stuck once some subtasks finish > > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Gyula Fora >Priority: Critical > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35157) Sources with watermark alignment get stuck once some subtasks finish
[ https://issues.apache.org/jira/browse/FLINK-35157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35157: --- Attachment: image-2024-04-24-21-36-16-146.png > Sources with watermark alignment get stuck once some subtasks finish > > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Gyula Fora >Priority: Critical > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check
[ https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840041#comment-17840041 ] elon_X commented on FLINK-35088: [~fanrui] I am very happy to fix this issue, please assign it to me, thank you very much:D > watermark alignment maxAllowedWatermarkDrift and updateInterval param need > check > > > Key: FLINK-35088 > URL: https://issues.apache.org/jira/browse/FLINK-35088 > Project: Flink > Issue Type: Improvement > Components: API / Core, Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-11-20-12-29-951.png > > > When I use watermark alignment, > 1.I found that setting maxAllowedWatermarkDrift to a negative number > initially led me to believe it could support delaying the consumption of the > source, so I tried it. Then, the upstream data flow would hang indefinitely. > Root cause: > {code:java} > long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() > + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} > If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark > < lastEmittedWatermark, then the SourceReader will be blocked indefinitely > and cannot recover. > I'm not sure if this is a supported feature of watermark alignment. If it's > not, I think an additional parameter validation should be implemented to > throw an exception on the client side if the value is negative. > 2.The updateInterval parameter also lacks validation. If I set it to 0, the > task will throw an exception when starting the job manager. The JDK class > java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and > throws the exception. > {code:java} > java.lang.IllegalArgumentException: null > at > java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) > ~[?:1.8.0_351] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) > ~[flink-dist_2.12-1.16.1.ja
[jira] [Comment Edited] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
[ https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839522#comment-17839522 ] elon_X edited comment on FLINK-35178 at 4/22/24 3:20 PM: - [~lijinzhong] Thank you for your response. I have been using the default value (true) for the "state.checkpoints.create-subdir" parameter. However, when I tested by setting this value to false, the result was the same, which might indicate I'm doing something wrong ? Additionally, I've encountered another issue. Even though I set {{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions are not being discarded even if they are not referenced. Only the checkpoint specified by the {{-s}} option (chk-x) is discarded. As shown in the diagram below, I restored from chk-34, but only chk-34 was discarded, while chk-32 and chk-33 continue to exist indefinitely. !image-2024-04-22-15-16-02-381.png! was (Author: JIRAUSER303028): [~lijinzhong] Thank you for your response. I have been using the default value (true) for the "state.checkpoints.create-subdir" parameter. However, when I tested by setting this value to false, the result was the same, which might indicate I'm doing something wrong. Additionally, I've encountered another issue. Even though I set {{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions are not being discarded even if they are not referenced. Only the checkpoint specified by the {{-s}} option (chk-x) is discarded. As shown in the diagram below, I restored from chk-34, but only chk-34 was discarded, while chk-32 and chk-33 continue to exist indefinitely. !image-2024-04-22-15-16-02-381.png! > Checkpoint CLAIM mode does not fully control snapshot ownership > --- > > Key: FLINK-35178 > URL: https://issues.apache.org/jira/browse/FLINK-35178 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-20-14-51-21-062.png, > image-2024-04-22-15-16-02-381.png > > > When I enable incremental checkpointing, and the task fails or is canceled > for some reason, restarting the task from {{-s checkpoint_path}} with > {{restoreMode CLAIM}} allows the Flink job to recover from the last > checkpoint, it just discards the previous checkpoint. > Then I found that this leads to the following two cases: > 1. If the new checkpoint_x meta file does not reference files in the shared > directory under the previous jobID: > the shared and taskowned directories from the previous Job will be left as > empty directories, and these two directories will persist without being > deleted by Flink. !image-2024-04-20-14-51-21-062.png! > 2. If the new checkpoint_x meta file references files in the shared directory > under the previous jobID: > the chk-(x-1) from the previous job will be discarded, but there will still > be state data in the shared directory under that job, which might persist for > a relatively long time. Here arises the question: the previous job is no > longer running, and it's unclear whether users should delete the state data. > Deleting it could lead to errors when the task is restarted, as the meta > might reference files that can no longer be found; this could be confusing > for users. > > The potential solution might be to reuse the previous job's jobID when > restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that > allows users to specify the jobID they want to recover from; > > Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
[ https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839522#comment-17839522 ] elon_X commented on FLINK-35178: [~lijinzhong] Thank you for your response. I have been using the default value (true) for the "state.checkpoints.create-subdir" parameter. However, when I tested by setting this value to false, the result was the same, which might indicate I'm doing something wrong. Additionally, I've encountered another issue. Even though I set {{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions are not being discarded even if they are not referenced. Only the checkpoint specified by the {{-s}} option (chk-x) is discarded. As shown in the diagram below, I restored from chk-34, but only chk-34 was discarded, while chk-32 and chk-33 continue to exist indefinitely. !image-2024-04-22-15-16-02-381.png! > Checkpoint CLAIM mode does not fully control snapshot ownership > --- > > Key: FLINK-35178 > URL: https://issues.apache.org/jira/browse/FLINK-35178 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-20-14-51-21-062.png, > image-2024-04-22-15-16-02-381.png > > > When I enable incremental checkpointing, and the task fails or is canceled > for some reason, restarting the task from {{-s checkpoint_path}} with > {{restoreMode CLAIM}} allows the Flink job to recover from the last > checkpoint, it just discards the previous checkpoint. > Then I found that this leads to the following two cases: > 1. If the new checkpoint_x meta file does not reference files in the shared > directory under the previous jobID: > the shared and taskowned directories from the previous Job will be left as > empty directories, and these two directories will persist without being > deleted by Flink. !image-2024-04-20-14-51-21-062.png! > 2. If the new checkpoint_x meta file references files in the shared directory > under the previous jobID: > the chk-(x-1) from the previous job will be discarded, but there will still > be state data in the shared directory under that job, which might persist for > a relatively long time. Here arises the question: the previous job is no > longer running, and it's unclear whether users should delete the state data. > Deleting it could lead to errors when the task is restarted, as the meta > might reference files that can no longer be found; this could be confusing > for users. > > The potential solution might be to reuse the previous job's jobID when > restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that > allows users to specify the jobID they want to recover from; > > Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
[ https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35178: --- Attachment: image-2024-04-22-15-16-02-381.png > Checkpoint CLAIM mode does not fully control snapshot ownership > --- > > Key: FLINK-35178 > URL: https://issues.apache.org/jira/browse/FLINK-35178 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-20-14-51-21-062.png, > image-2024-04-22-15-16-02-381.png > > > When I enable incremental checkpointing, and the task fails or is canceled > for some reason, restarting the task from {{-s checkpoint_path}} with > {{restoreMode CLAIM}} allows the Flink job to recover from the last > checkpoint, it just discards the previous checkpoint. > Then I found that this leads to the following two cases: > 1. If the new checkpoint_x meta file does not reference files in the shared > directory under the previous jobID: > the shared and taskowned directories from the previous Job will be left as > empty directories, and these two directories will persist without being > deleted by Flink. !image-2024-04-20-14-51-21-062.png! > 2. If the new checkpoint_x meta file references files in the shared directory > under the previous jobID: > the chk-(x-1) from the previous job will be discarded, but there will still > be state data in the shared directory under that job, which might persist for > a relatively long time. Here arises the question: the previous job is no > longer running, and it's unclear whether users should delete the state data. > Deleting it could lead to errors when the task is restarted, as the meta > might reference files that can no longer be found; this could be confusing > for users. > > The potential solution might be to reuse the previous job's jobID when > restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that > allows users to specify the jobID they want to recover from; > > Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership
elon_X created FLINK-35178: -- Summary: Checkpoint CLAIM mode does not fully control snapshot ownership Key: FLINK-35178 URL: https://issues.apache.org/jira/browse/FLINK-35178 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.18.0 Reporter: elon_X Attachments: image-2024-04-20-14-51-21-062.png When I enable incremental checkpointing, and the task fails or is canceled for some reason, restarting the task from {{-s checkpoint_path}} with {{restoreMode CLAIM}} allows the Flink job to recover from the last checkpoint, it just discards the previous checkpoint. Then I found that this leads to the following two cases: 1. If the new checkpoint_x meta file does not reference files in the shared directory under the previous jobID: the shared and taskowned directories from the previous Job will be left as empty directories, and these two directories will persist without being deleted by Flink. !image-2024-04-20-14-51-21-062.png! 2. If the new checkpoint_x meta file references files in the shared directory under the previous jobID: the chk-(x-1) from the previous job will be discarded, but there will still be state data in the shared directory under that job, which might persist for a relatively long time. Here arises the question: the previous job is no longer running, and it's unclear whether users should delete the state data. Deleting it could lead to errors when the task is restarted, as the meta might reference files that can no longer be found; this could be confusing for users. The potential solution might be to reuse the previous job's jobID when restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that allows users to specify the jobID they want to recover from; Please correct me if there's anything I've misunderstood. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks
[ https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35160: --- Attachment: image-2024-04-20-14-43-36-939.png Description: After receiving feedback from the business side about performance issues in their tasks, we attempted to troubleshoot and discovered that their tasks had issues with thread deadlocks. However, the Thread Dump entry on the Flink page only shows thread stacks. Since the users are not very familiar with Java stacks, they couldn't clearly identify that the deadlocks were due to issues in the business logic code and mistakenly thought they were problems with the Flink framework !image-2024-04-18-20-57-52-440.png! !image-2024-04-18-20-58-09-872.png! the JVM's jstack command can clearly display thread deadlocks, unfortunately, the business team does not have the permissions to log into the machines. hear is the jstack log !image-2024-04-20-14-43-36-939.png! FlameGraph are excellent for visualizing performance bottlenecks and hotspots in application profiling but are not designed to pinpoint the exact lines of code where thread deadlocks occur. !image-2024-04-18-21-01-22-881.png! Perhaps we could enhance the Thread Dump feature to display thread deadlocks, similar to what the {{jstack}} command provides. !image-2024-04-18-21-34-41-014.png! was: After receiving feedback from the business side about performance issues in their tasks, we attempted to troubleshoot and discovered that their tasks had issues with thread deadlocks. However, the Thread Dump entry on the Flink page only shows thread stacks. Since the users are not very familiar with Java stacks, they couldn't clearly identify that the deadlocks were due to issues in the business logic code and mistakenly thought they were problems with the Flink framework !image-2024-04-18-20-57-52-440.png! !image-2024-04-18-20-58-09-872.png! the JVM's jstack command can clearly display thread deadlocks, unfortunately, the business team does not have the permissions to log into the machines. hear is the jstack log !image-2024-04-18-21-00-04-532.png! FlameGraph are excellent for visualizing performance bottlenecks and hotspots in application profiling but are not designed to pinpoint the exact lines of code where thread deadlocks occur. !image-2024-04-18-21-01-22-881.png! Perhaps we could enhance the Thread Dump feature to display thread deadlocks, similar to what the {{jstack}} command provides. !image-2024-04-18-21-34-41-014.png! > Support for Thread Dump provides a convenient way to display issues of thread > deadlocks in tasks > > > Key: FLINK-35160 > URL: https://issues.apache.org/jira/browse/FLINK-35160 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-18-20-57-52-440.png, > image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, > image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png > > > After receiving feedback from the business side about performance issues in > their tasks, we attempted to troubleshoot and discovered that their tasks had > issues with thread deadlocks. However, the Thread Dump entry on the Flink > page only shows thread stacks. Since the users are not very familiar with > Java stacks, they couldn't clearly identify that the deadlocks were due to > issues in the business logic code and mistakenly thought they were problems > with the Flink framework > !image-2024-04-18-20-57-52-440.png! > !image-2024-04-18-20-58-09-872.png! > the JVM's jstack command can clearly display thread deadlocks, unfortunately, > the business team does not have the permissions to log into the machines. > hear is the jstack log > !image-2024-04-20-14-43-36-939.png! > FlameGraph are excellent for visualizing performance bottlenecks and hotspots > in application profiling but are not designed to pinpoint the exact lines of > code where thread deadlocks occur. > !image-2024-04-18-21-01-22-881.png! > Perhaps we could enhance the Thread Dump feature to display thread deadlocks, > similar to what the {{jstack}} command provides. > > !image-2024-04-18-21-34-41-014.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks
[ https://issues.apache.org/jira/browse/FLINK-35160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35160: --- Attachment: (was: image-2024-04-18-21-00-04-532.png) > Support for Thread Dump provides a convenient way to display issues of thread > deadlocks in tasks > > > Key: FLINK-35160 > URL: https://issues.apache.org/jira/browse/FLINK-35160 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Affects Versions: 1.16.0, 1.17.1, 1.19.0, 1.18.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-18-20-57-52-440.png, > image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-01-22-881.png, > image-2024-04-18-21-34-41-014.png, image-2024-04-20-14-43-36-939.png > > > After receiving feedback from the business side about performance issues in > their tasks, we attempted to troubleshoot and discovered that their tasks had > issues with thread deadlocks. However, the Thread Dump entry on the Flink > page only shows thread stacks. Since the users are not very familiar with > Java stacks, they couldn't clearly identify that the deadlocks were due to > issues in the business logic code and mistakenly thought they were problems > with the Flink framework > !image-2024-04-18-20-57-52-440.png! > !image-2024-04-18-20-58-09-872.png! > the JVM's jstack command can clearly display thread deadlocks, unfortunately, > the business team does not have the permissions to log into the machines. > hear is the jstack log > !image-2024-04-18-21-00-04-532.png! > FlameGraph are excellent for visualizing performance bottlenecks and hotspots > in application profiling but are not designed to pinpoint the exact lines of > code where thread deadlocks occur. > !image-2024-04-18-21-01-22-881.png! > Perhaps we could enhance the Thread Dump feature to display thread deadlocks, > similar to what the {{jstack}} command provides. > > !image-2024-04-18-21-34-41-014.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35160) Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks
elon_X created FLINK-35160: -- Summary: Support for Thread Dump provides a convenient way to display issues of thread deadlocks in tasks Key: FLINK-35160 URL: https://issues.apache.org/jira/browse/FLINK-35160 Project: Flink Issue Type: Improvement Components: Runtime / REST Affects Versions: 1.18.1, 1.19.0, 1.17.1, 1.16.0 Reporter: elon_X Attachments: image-2024-04-18-20-57-52-440.png, image-2024-04-18-20-58-09-872.png, image-2024-04-18-21-00-04-532.png, image-2024-04-18-21-01-22-881.png, image-2024-04-18-21-34-41-014.png After receiving feedback from the business side about performance issues in their tasks, we attempted to troubleshoot and discovered that their tasks had issues with thread deadlocks. However, the Thread Dump entry on the Flink page only shows thread stacks. Since the users are not very familiar with Java stacks, they couldn't clearly identify that the deadlocks were due to issues in the business logic code and mistakenly thought they were problems with the Flink framework !image-2024-04-18-20-57-52-440.png! !image-2024-04-18-20-58-09-872.png! the JVM's jstack command can clearly display thread deadlocks, unfortunately, the business team does not have the permissions to log into the machines. hear is the jstack log !image-2024-04-18-21-00-04-532.png! FlameGraph are excellent for visualizing performance bottlenecks and hotspots in application profiling but are not designed to pinpoint the exact lines of code where thread deadlocks occur. !image-2024-04-18-21-01-22-881.png! Perhaps we could enhance the Thread Dump feature to display thread deadlocks, similar to what the {{jstack}} command provides. !image-2024-04-18-21-34-41-014.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check
[ https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838534#comment-17838534 ] elon_X commented on FLINK-35088: [~martijnvisser] [~masc] I'm sorry for the late reply. I have conducted a retest based on Flink version 1.18 and found that the problem still persists. Then, I checked the latest code on the main Flink branch and found that there is no validation for these two parameters. What are your thoughts on this? > watermark alignment maxAllowedWatermarkDrift and updateInterval param need > check > > > Key: FLINK-35088 > URL: https://issues.apache.org/jira/browse/FLINK-35088 > Project: Flink > Issue Type: Improvement > Components: API / Core, Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-11-20-12-29-951.png > > > When I use watermark alignment, > 1.I found that setting maxAllowedWatermarkDrift to a negative number > initially led me to believe it could support delaying the consumption of the > source, so I tried it. Then, the upstream data flow would hang indefinitely. > Root cause: > {code:java} > long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() > + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} > If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark > < lastEmittedWatermark, then the SourceReader will be blocked indefinitely > and cannot recover. > I'm not sure if this is a supported feature of watermark alignment. If it's > not, I think an additional parameter validation should be implemented to > throw an exception on the client side if the value is negative. > 2.The updateInterval parameter also lacks validation. If I set it to 0, the > task will throw an exception when starting the job manager. The JDK class > java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and > throws the exception. > {code:java} > java.lang.IllegalArgumentException: null > at > java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) > ~[?:1.8.0_351] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGr
[jira] [Comment Edited] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238 ] elon_X edited comment on FLINK-35076 at 4/11/24 3:23 PM: - [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. I'm not quite sure how to shuffle the data source before {{{}fromSource{}}}. was (Author: JIRAUSER303028): [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward. until that minimum value is > updated, only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238 ] elon_X commented on FLINK-35076: [~kkrugler] Thank you for your reply. Setting the idle time is not very controllable in terms of the specific timing. For example, setting it to 10 seconds, the minimum watermark will still not change within these 10 seconds unless the idle time is set as small as possible. I'm not sure if this could solve the problem and further testing is needed; For the solution of shuffling the stream, I didn't quite understand. In the Flink API: DataStream xx = env.fromSource(Source source, WatermarkStrategy timestampsAndWatermarks, String sourceName) Only DataStream supports rebalance, Source can't rebalance. > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward. until that minimum value is > updated, only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check
elon_X created FLINK-35088: -- Summary: watermark alignment maxAllowedWatermarkDrift and updateInterval param need check Key: FLINK-35088 URL: https://issues.apache.org/jira/browse/FLINK-35088 Project: Flink Issue Type: Improvement Components: API / Core, Runtime / Coordination Affects Versions: 1.16.1 Reporter: elon_X Attachments: image-2024-04-11-20-12-29-951.png When I use watermark alignment, 1.I found that setting maxAllowedWatermarkDrift to a negative number initially led me to believe it could support delaying the consumption of the source, so I tried it. Then, the upstream data flow would hang indefinitely. Root cause: {code:java} long maxAllowedWatermark = globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code} If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark < lastEmittedWatermark, then the SourceReader will be blocked indefinitely and cannot recover. I'm not sure if this is a supported feature of watermark alignment. If it's not, I think an additional parameter validation should be implemented to throw an exception on the client side if the value is negative. 2.The updateInterval parameter also lacks validation. If I set it to 0, the task will throw an exception when starting the job manager. The JDK class java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and throws the exception. {code:java} java.lang.IllegalArgumentException: null at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565) ~[?:1.8.0_351] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:208) ~[flink-dist_2.12-1.16.1.jar:1.16.1] at org.
[jira] [Commented] (FLINK-35064) Flink sql connector pulsar/hive com.fasterxml.jackson.annotation.JsonFormat$Value conflict
[ https://issues.apache.org/jira/browse/FLINK-35064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835964#comment-17835964 ] elon_X commented on FLINK-35064: [~chalixar] Thank you for your reply. I also used relocation to solve this problem, and I am very willing to upgrade for pulsar. My question is, do other connectors also need relocation, or is it just for handling flink-connector-pulsar? > Flink sql connector pulsar/hive > com.fasterxml.jackson.annotation.JsonFormat$Value conflict > -- > > Key: FLINK-35064 > URL: https://issues.apache.org/jira/browse/FLINK-35064 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive, Connectors / Pulsar >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > > When I compile and package {{flink-sql-connector-pulsar}} & > {{{}flink-sql-connector-hive{}}}, and then put these two jar files into the > Flink lib directory, I execute the following SQL statement through > {{{}bin/sql-client.sh{}}}: > > {code:java} > // code placeholder > CREATE TABLE > pulsar_table ( > content string, > proc_time AS PROCTIME () > ) > WITH > ( > 'connector' = 'pulsar', > 'topics' = 'persistent://xxx', > 'service-url' = 'pulsar://xxx', > 'source.subscription-name' = 'xxx', > 'source.start.message-id' = 'latest', > 'format' = 'csv', > 'pulsar.client.authPluginClassName' = > 'org.apache.pulsar.client.impl.auth.AuthenticationToken', > 'pulsar.client.authParams' = 'token:xxx' > ); > > select * from pulsar_table; {code} > The task error exception stack is as follows: > > {code:java} > Caused by: java.lang.NoSuchMethodError: > com.fasterxml.jackson.annotation.JsonFormat$Value.empty()Lcom/fasterxml/jackson/annotation/JsonFormat$Value; > at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.cfg.MapperConfig.(MapperConfig.java:56) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:660) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.shade.com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:576) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.common.util.ObjectMapperFactory.createObjectMapperInstance(ObjectMapperFactory.java:151) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.common.util.ObjectMapperFactory.(ObjectMapperFactory.java:142) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.create(ConfigurationDataUtils.java:35) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.conf.ConfigurationDataUtils.(ConfigurationDataUtils.java:43) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.pulsar.client.impl.ClientBuilderImpl.loadConf(ClientBuilderImpl.java:77) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:105) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:95) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator.(PulsarSourceEnumerator.java:76) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.connector.pulsar.source.PulsarSource.createEnumerator(PulsarSource.java:144) > ~[flink-sql-connector-pulsar-4.0-SNAPSHOT.jar:4.0-SNAPSHOT]at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) > ~[flink-dist_2.12-1.16.1.jar:1.16.1] > {code} > > The exception shows a conflict with > {{{}com.fasterxml.jackson.annotation.JsonFormat$Value{}}}. I investigated and > found that {{flink-sql-connector-pulsar}} and {{flink-sql-connector-hive}} > depend on different versions, leading to this conflict. > {code:java} > // flink-sql-connector-pulsar pom.xml > > com.fasterxml.jackson > jackson-bom > pom > import > 2.13.4.20221013 > > // flink-sql-connector-hive pom.xml > > com.fasterxml.jackson > jackson-bom > pom > import > 2.15.3 > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Description: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. !image-2024-04-10-20-29-13-835.png! The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward. until that minimum value is updated, only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! was: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. !image-2024-04-10-20-29-13-835.png! The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience se
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Attachment: image-2024-04-10-20-29-13-835.png > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-25-59-387.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward, until that minimum value is > updated. Only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Description: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. !image-2024-04-10-20-29-13-835.png! The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! was: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. !image-2024-04-10-20-25-59-387.png! The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, > image-2024-04-10-20-29-13-835.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-29-13-835.png! > The final join output will experience se
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Attachment: image-2024-04-10-20-25-59-387.png > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward, until that minimum value is > updated. Only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Description: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. !image-2024-04-10-20-25-59-387.png! The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! was: In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > !image-2024-04-10-20-25-59-387.png! > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-2
[jira] [Updated] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
[ https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] elon_X updated FLINK-35076: --- Attachment: (was: image-2024-04-10-20-13-14-752.png) > Watermark alignment will cause data flow to experience serious shake > > > Key: FLINK-35076 > URL: https://issues.apache.org/jira/browse/FLINK-35076 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.16.1 >Reporter: elon_X >Priority: Major > Attachments: image-2024-04-10-20-15-05-731.png, > image-2024-04-10-20-23-13-872.png > > > In our company, there is a requirement scenario for multi-stream join > operations, we are making modifications based on Flink watermark alignment, > then I found that the final join output would experience serious shake. > and I analyzed the reasons: an upstream topic has more than 300 partitions. > The number of partitions requested for this topic is too large, causing some > partitions to frequently experience intermittent writes with QPS=0. This > phenomenon is more serious between 2 am and 5 am.However, the overall topic > writing is very smooth. > > The final join output will experience serious shake, as shown in the > following diagram: > !image-2024-04-10-20-15-05-731.png! > Root cause: > # The {{SourceOperator#emitLatestWatermark}} reports the > lastEmittedWatermark to the SourceCoordinator. > # If the partition write is zero during a certain period, the > lastEmittedWatermark sent by the subtask corresponding to that partition > remains unchanged. > # The SourceCoordinator aggregates the watermarks of all subtasks according > to the watermark group and takes the smallest watermark. This means that the > maxAllowedWatermark may remain unchanged for some time, even though the > overall upstream data flow is moving forward, until that minimum value is > updated. Only then will everything change, which will manifest as serious > shake in the output data stream. > I think choosing the global minimum might not be a good option. Using min/max > could more likely encounter some edge cases. Perhaps choosing a median value > would be more appropriate? Or a more complex selection strategy? > If replaced with a median value, it can ensure that the overall data flow is > very smooth: > !image-2024-04-10-20-23-13-872.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35076) Watermark alignment will cause data flow to experience serious shake
elon_X created FLINK-35076: -- Summary: Watermark alignment will cause data flow to experience serious shake Key: FLINK-35076 URL: https://issues.apache.org/jira/browse/FLINK-35076 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.16.1 Reporter: elon_X Attachments: image-2024-04-10-20-13-14-752.png, image-2024-04-10-20-15-05-731.png, image-2024-04-10-20-23-13-872.png In our company, there is a requirement scenario for multi-stream join operations, we are making modifications based on Flink watermark alignment, then I found that the final join output would experience serious shake. and I analyzed the reasons: an upstream topic has more than 300 partitions. The number of partitions requested for this topic is too large, causing some partitions to frequently experience intermittent writes with QPS=0. This phenomenon is more serious between 2 am and 5 am.However, the overall topic writing is very smooth. The final join output will experience serious shake, as shown in the following diagram: !image-2024-04-10-20-15-05-731.png! Root cause: # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark to the SourceCoordinator. # If the partition write is zero during a certain period, the lastEmittedWatermark sent by the subtask corresponding to that partition remains unchanged. # The SourceCoordinator aggregates the watermarks of all subtasks according to the watermark group and takes the smallest watermark. This means that the maxAllowedWatermark may remain unchanged for some time, even though the overall upstream data flow is moving forward, until that minimum value is updated. Only then will everything change, which will manifest as serious shake in the output data stream. I think choosing the global minimum might not be a good option. Using min/max could more likely encounter some edge cases. Perhaps choosing a median value would be more appropriate? Or a more complex selection strategy? If replaced with a median value, it can ensure that the overall data flow is very smooth: !image-2024-04-10-20-23-13-872.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)