[jira] [Updated] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ
[ https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35420: --- Fix Version/s: 1.20.0 > WordCountMapredITCase fails to compile in IntelliJ > -- > > Key: FLINK-35420 > URL: https://issues.apache.org/jira/browse/FLINK-35420 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.20.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > {noformat} > flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8 > value isFalse is not a member of ?0 > possible cause: maybe a semicolon is missing before `value isFalse'? > .isFalse() > {noformat} > Might be caused by: > https://youtrack.jetbrains.com/issue/SCL-20679 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ
[ https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-35420: -- Assignee: Piotr Nowojski > WordCountMapredITCase fails to compile in IntelliJ > -- > > Key: FLINK-35420 > URL: https://issues.apache.org/jira/browse/FLINK-35420 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.20.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > {noformat} > flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8 > value isFalse is not a member of ?0 > possible cause: maybe a semicolon is missing before `value isFalse'? > .isFalse() > {noformat} > Might be caused by: > https://youtrack.jetbrains.com/issue/SCL-20679 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ
[ https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-35420. -- Resolution: Fixed 9ccfb65 into apache:master now > WordCountMapredITCase fails to compile in IntelliJ > -- > > Key: FLINK-35420 > URL: https://issues.apache.org/jira/browse/FLINK-35420 > Project: Flink > Issue Type: Bug > Components: Connectors / Hadoop Compatibility >Affects Versions: 1.20.0 >Reporter: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > {noformat} > flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8 > value isFalse is not a member of ?0 > possible cause: maybe a semicolon is missing before `value isFalse'? > .isFalse() > {noformat} > Might be caused by: > https://youtrack.jetbrains.com/issue/SCL-20679 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ
Piotr Nowojski created FLINK-35420: -- Summary: WordCountMapredITCase fails to compile in IntelliJ Key: FLINK-35420 URL: https://issues.apache.org/jira/browse/FLINK-35420 Project: Flink Issue Type: Bug Components: Connectors / Hadoop Compatibility Affects Versions: 1.20.0 Reporter: Piotr Nowojski {noformat} flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8 value isFalse is not a member of ?0 possible cause: maybe a semicolon is missing before `value isFalse'? .isFalse() {noformat} Might be caused by: https://youtrack.jetbrains.com/issue/SCL-20679 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-32828. -- Fix Version/s: 2.0.0 1.18.2 1.20.0 1.19.1 Resolution: Fixed Merged to master as 7fd6c6da26c Merged to release-1.19 as a160f8db1f7 Merged to release-1.18 as 4848a50d735 > Partition aware watermark not handled correctly shortly after job start up > from checkpoint or savepoint > --- > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Labels: pull-request-available > Fix For: 2.0.0, 1.18.2, 1.20.0, 1.19.1 > > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark >
[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.
[ https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846869#comment-17846869 ] Piotr Nowojski commented on FLINK-35351: Thanks, after reading the PR indeed I understand this more :) > Restore from unaligned checkpoints with a custom partitioner fails. > --- > > Key: FLINK-35351 > URL: https://issues.apache.org/jira/browse/FLINK-35351 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Dmitriy Linevich >Assignee: Dmitriy Linevich >Priority: Major > Labels: pull-request-available > > We encountered a problem when using a custom partitioner with unaligned > checkpoints. The bug reproduces under the following steps: > # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied > after the Source task. > # Make a checkpoint. > # Restore from the checkpoint with a different source parallelism: > Source[1]->Sink[3]. > # An exception is thrown. > This issue does not occur when restoring with the same parallelism or when > changing the Sink parallelism. The exception only occurs when the parallelism > of the Source is changed while the Sink parallelism remains the same. > See the exception below and the test code at the end. > {code:java} > [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN Sink: sink > (3/3)#0 > (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) > switched from RUNNING to FAILED with failure cause: > java.io.IOException: Can't get next record for channel > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) > [classes/:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > Caused by: java.io.IOException: Corrupt stream, found tag: -1 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44) > ~[classes/:?] > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103) > ~[classes/:?] > ... 10 more {code} > We discovered that this issue occurs
[jira] [Updated] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-32828: --- Summary: Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint (was: Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint) > Partition aware watermark not handled correctly shortly after job start up > from checkpoint or savepoint > --- > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == >
[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846694#comment-17846694 ] Piotr Nowojski edited comment on FLINK-32828 at 5/15/24 4:27 PM: - We have just stumbled upon the same issue and I can confirm that it's quite critical bug that can leads to all kinds of incorrect results. The problem is that the initial splits recovered from state are initial are registered in the {{WatermarkOutputMultiplexer}} only when first record from that split has been emitted. Resulting watermark was not properly combined from the initial splits, but only from the splits that have already emitted at least one record. I will publish a fix for that shortly. edit: Also the problem doesn't affect only Kafka, but all FLIP-27 sources (anything that uses {{SourceOperator}}. was (Author: pnowojski): We have just stumbled upon the same issue and I can confirm that it's quite critical bug that can leads to all kinds of incorrect results. The problem is that the initial splits recovered from state are initial are registered in the {{WatermarkOutputMultiplexer}} only when first record from that split has been emitted. Resulting watermark was not properly combined from the initial splits, but only from the splits that have already emitted at least one record. I will publish a fix for that shortly. > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > - > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single
[jira] [Commented] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846694#comment-17846694 ] Piotr Nowojski commented on FLINK-32828: We have just stumbled upon the same issue and I can confirm that it's quite critical bug that can leads to all kinds of incorrect results. The problem is that the initial splits recovered from state are initial are registered in the {{WatermarkOutputMultiplexer}} only when first record from that split has been emitted. Resulting watermark was not properly combined from the initial splits, but only from the splits that have already emitted at least one record. I will publish a fix for that shortly. > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > - > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp
[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-32828: --- Priority: Critical (was: Major) > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > - > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit and then should not > progress when
[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-32828: --- Affects Version/s: 1.18.1 1.19.0 > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > - > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit
[jira] [Assigned] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint
[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-32828: -- Assignee: Piotr Nowojski > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > - > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka >Affects Versions: 1.17.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster >Reporter: Grzegorz Liter >Assignee: Piotr Nowojski >Priority: Major > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit and then should not > progress when
[jira] [Commented] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint
[ https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846597#comment-17846597 ] Piotr Nowojski commented on FLINK-33109: Hi! {quote} I guess this bug has been fixed in 1.17.2 and 1.18. {quote} Has this been confirmed? [~YordanPavlov], has this issue been fixed for you? > Watermark alignment not applied after recovery from checkpoint > -- > > Key: FLINK-33109 > URL: https://issues.apache.org/jira/browse/FLINK-33109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.17.1 >Reporter: Yordan Pavlov >Priority: Major > Attachments: WatermarkTest-1.scala, > image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png > > > I am observing a problem where after recovery from a checkpoint the Kafka > source watermarks would start to diverge not honoring the watermark alignment > setting I have applied. > I have a Kafka source which reads a topic with 32 partitions. I am applying > the following watermark strategy: > {code:java} > new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => > msg.value.getTimestamp) > .withWatermarkAlignment("alignment-sources-group", > time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code} > > This works great up until my job needs to recover from checkpoint. Once the > recovery takes place, no alignment is taking place any more. This can best be > illustrated by looking at the watermark metrics for various operators in the > image: > !image-2023-09-18-15-40-06-868.png! > > You can see how the watermarks disperse after the recovery. Trying to debug > the problem I noticed that before the failure there would be calls in > > {code:java} > SourceCoordinator::announceCombinedWatermark() > {code} > after the recovery, no calls get there, so no value for > {code:java} > watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code} > is ever read. I can manually fix the problem If I stop the job, clear all > state from Zookeeper and then manually start Flink providing the last > checkpoint with > {code:java} > '–fromSavepoint'{code} > flag. This would cause the SourceCoordinator to be constructed properly and > watermark drift to be checked. Once recovery manually watermarks would again > converge to the allowed drift as seen in the metrics: > !image-2023-09-18-15-46-16-106.png! > > Let me know If I can be helpful by providing any more information. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.
[ https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846231#comment-17846231 ] Piotr Nowojski edited comment on FLINK-35351 at 5/14/24 9:11 AM: - Hi [~lda-dima], thanks for reporting this issue and what looks like correct analysis. However I think the proposed solution goes in the wrong direction - we can not disallow rescaling, as that might prevent jobs from recovering in case of for example a loss of a TaskManager. Note that the problem probably happens due to the use of " custom partitioner" here. AFAIR unaligned checkpoints are (for most likely the exactly same reason that you discovered in this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections with unaligned checkpoints are unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed]. There is somewhere in the codebase code, that checks whether for given partitioning unaligned checkpoints should be enabled or not. I would guess that custom partitioners are mishandled there? Relevant code pointers: * {{org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType#FORCED_ALIGNED}} * {{org.apache.flink.streaming.runtime.io.RecordWriterOutput#supportsUnalignedCheckpoints}} * {{org.apache.flink.streaming.api.graph.StreamGraphGenerator#shouldDisableUnalignedCheckpointing}} was (Author: pnowojski): Hi [~lda-dima], thanks for reporting this issue and what looks like correct analysis. However I think the proposed solution goes in the wrong direction - we can not disallow rescaling, as that might prevent jobs from recovering in case of for example a loss of a TaskManager. Note that the problem probably happens due to the use of " custom partitioner" here. AFAIR unaligned checkpoints are (for most likely the exactly same reason that you discovered in this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections with unaligned checkpoints are unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed]. There is somewhere in the codebase code, that checks whether for given partitioning unaligned checkpoints should be enabled or not. I would guess that custom partitioners are mishandled there? > Restore from unaligned checkpoints with a custom partitioner fails. > --- > > Key: FLINK-35351 > URL: https://issues.apache.org/jira/browse/FLINK-35351 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Dmitriy Linevich >Assignee: Dmitriy Linevich >Priority: Major > > We encountered a problem when using a custom partitioner with unaligned > checkpoints. The bug reproduces under the following steps: > # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied > after the Source task. > # Make a checkpoint. > # Restore from the checkpoint with a different source parallelism: > Source[1]->Sink[3]. > # An exception is thrown. > This issue does not occur when restoring with the same parallelism or when > changing the Sink parallelism. The exception only occurs when the parallelism > of the Source is changed while the Sink parallelism remains the same. > See the exception below and the test code at the end. > {code:java} > [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN Sink: sink > (3/3)#0 > (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) > switched from RUNNING to FAILED with failure cause: > java.io.IOException: Can't get next record for channel > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > [classes/:?] > at
[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.
[ https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846231#comment-17846231 ] Piotr Nowojski commented on FLINK-35351: Hi [~lda-dima], thanks for reporting this issue and what looks like correct analysis. However I think the proposed solution goes in the wrong direction - we can not disallow rescaling, as that might prevent jobs from recovering in case of for example a loss of a TaskManager. Note that the problem probably happens due to the use of " custom partitioner" here. AFAIR unaligned checkpoints are (for most likely the exactly same reason that you discovered in this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections with unaligned checkpoints are unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed]. There is somewhere in the codebase code, that checks whether for given partitioning unaligned checkpoints should be enabled or not. I would guess that custom partitioners are mishandled there? > Restore from unaligned checkpoints with a custom partitioner fails. > --- > > Key: FLINK-35351 > URL: https://issues.apache.org/jira/browse/FLINK-35351 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Dmitriy Linevich >Assignee: Dmitriy Linevich >Priority: Major > > We encountered a problem when using a custom partitioner with unaligned > checkpoints. The bug reproduces under the following steps: > # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied > after the Source task. > # Make a checkpoint. > # Restore from the checkpoint with a different source parallelism: > Source[1]->Sink[3]. > # An exception is thrown. > This issue does not occur when restoring with the same parallelism or when > changing the Sink parallelism. The exception only occurs when the parallelism > of the Source is changed while the Sink parallelism remains the same. > See the exception below and the test code at the end. > {code:java} > [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN Sink: sink > (3/3)#0 > (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) > switched from RUNNING to FAILED with failure cause: > java.io.IOException: Can't get next record for channel > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) > [classes/:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > Caused by: java.io.IOException: Corrupt stream, found tag: -1 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44) > ~[classes/:?] > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > ~[classes/:?] > at >
[jira] [Assigned] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.
[ https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-35351: -- Assignee: Dmitriy Linevich > Restore from unaligned checkpoints with a custom partitioner fails. > --- > > Key: FLINK-35351 > URL: https://issues.apache.org/jira/browse/FLINK-35351 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Dmitriy Linevich >Assignee: Dmitriy Linevich >Priority: Major > > We encountered a problem when using a custom partitioner with unaligned > checkpoints. The bug reproduces under the following steps: > # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied > after the Source task. > # Make a checkpoint. > # Restore from the checkpoint with a different source parallelism: > Source[1]->Sink[3]. > # An exception is thrown. > This issue does not occur when restoring with the same parallelism or when > changing the Sink parallelism. The exception only occurs when the parallelism > of the Source is changed while the Sink parallelism remains the same. > See the exception below and the test code at the end. > {code:java} > [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN Sink: sink > (3/3)#0 > (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) > switched from RUNNING to FAILED with failure cause: > java.io.IOException: Can't get next record for channel > InputChannelInfo{gateIdx=0, inputChannelIdx=0} > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960) > ~[classes/:?] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) > [classes/:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) > [classes/:?] > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121] > Caused by: java.io.IOException: Corrupt stream, found tag: -1 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44) > ~[classes/:?] > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > ~[classes/:?] > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54) > ~[classes/:?] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103) > ~[classes/:?] > ... 10 more {code} > We discovered that this issue occurs due to an optimization in the >
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840327#comment-17840327 ] Piotr Nowojski commented on FLINK-33856: Hey [~hejufang001]! Sorry for the usual open source delay :( The FLIP looks good to me. I would only suggest to add a config option to disable/enable those child spans. It would be also nice to have some aggregated values like {{sum}} or {{max}} in the parent span, in a similar way how recovery spans have those currently, but that might be an independent effort. I've created a FLIP-448 page for this effort: https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Add+sub-task+spans+to+TraceReporter+for+checkpointing . Next steps would be to create dev mailing list thread. nit: I think if you press the button [Publish to web|https://images.template.net/wp-content/uploads/2022/10/How-to-Share-Google-Docs-with-Others-Publish-on-Web.jpg], the wiki widget will be able to generate a preview. > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33856) Add sub-task spans to TraceReporter for checkpointing
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-33856: --- Summary: Add sub-task spans to TraceReporter for checkpointing (was: Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making) > Add sub-task spans to TraceReporter for checkpointing > - > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-34913. -- Fix Version/s: 1.20.0 (was: 2.0.0) Resolution: Fixed > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > The following failures can occur during job's recovery when using clip & > ingest. This code path is currently not available, so the bug can not happen > for users. > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) > {noformat} > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-20217) More fine-grained timer processing
[ https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-20217: -- Assignee: Piotr Nowojski > More fine-grained timer processing > -- > > Key: FLINK-20217 > URL: https://issues.apache.org/jira/browse/FLINK-20217 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Runtime / Task >Affects Versions: 1.10.2, 1.11.2, 1.12.0 >Reporter: Nico Kruber >Assignee: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > Timers are currently processed in one big block under the checkpoint lock > (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic > in a number of scenarios while doing checkpointing which would lead to > checkpoints timing out (and even unaligned checkpoints would not help). > If you have a huge number of timers to process when advancing the watermark > and the task is also back-pressured, the situation may actually be worse > since you would block on the checkpoint lock and also wait for > buffers/credits from the receiver. > I propose to make this loop more fine-grained so that it is interruptible by > checkpoints, but maybe there is also some other way to improve here. > This issue has been for example observed here: > https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836113#comment-17836113 ] Piotr Nowojski commented on FLINK-34704: Sounds good to me (y) Indeed we could later provide some kind of overdraft buffer capacity to be used just for checkpointing. I think that this might relate to the things I want to propose in FLIP-443 as it will give the AWOP some way of knowing that it should use the overdraft buffer. Let's discuss this later and keep the ticket open :) > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083 ] Piotr Nowojski edited comment on FLINK-34704 at 4/11/24 9:57 AM: - Yes, I've read the referenced dev mailing list thread and I still don't agree with the conclusion there, that AWOP should allow for a checkpoint to happen while it's {{.yield()}}'ing :) {quote} the checkpoint still happen when some user code is running. {quote} If you are talking about async user code from AWOP, that's not a problem and never has been. When I'm talking about problems with state inconsistency of upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM operator that is CHAINED with AWOP: network input -> OperatorA -> AWOP -> network output OperatorA's state can become corrupted/inconsistent if you allow for checkpoint to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And side note, it's not only a problem of {{AWOP}} but any operator that wants to {{.yield()}}. Operators can only yield to downstream, which also means checkpoints can not be executed, and it affects also firing timers. I have a WIP FLIP-443 that will introduce a neat trick to allow operators to return the execution back to {{StreamTask}} so that checkpoint can happen, but AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to {{.yield()}}, it means it has no more space in the buffer to store any more records. So we can not checkpoint {{AWOP}} until some buffer space will become available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer space becomes available, return from the {{#processElement()}} call, and then it needs to relay on a fix for FLINK-35051 for stream task to prioritise performing a checkpoint over executing more records/mails. was (Author: pnowojski): {quote} the checkpoint still happen when some user code is running. {quote} If you are talking about async user code from AWOP, that's not a problem and never has been. When I'm talking about problems with state inconsistency of upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM operator that is CHAINED with AWOP: network input -> OperatorA -> AWOP -> network output OperatorA's state can become corrupted/inconsistent if you allow for checkpoint to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And side note, it's not only a problem of {{AWOP}} but any operator that wants to {{.yield()}}. Operators can only yield to downstream, which also means checkpoints can not be executed, and it affects also firing timers. I have a WIP FLIP-443 that will introduce a neat trick to allow operators to return the execution back to {{StreamTask}} so that checkpoint can happen, but AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to {{.yield()}}, it means it has no more space in the buffer to store any more records. So we can not checkpoint {{AWOP}} until some buffer space will become available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer space becomes available, return from the {{#processElement()}} call, and then it needs to relay on a fix for FLINK-35051 for stream task to prioritise performing a checkpoint over executing more records/mails. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083 ] Piotr Nowojski commented on FLINK-34704: {quote} the checkpoint still happen when some user code is running. {quote} If you are talking about async user code from AWOP, that's not a problem and never has been. When I'm talking about problems with state inconsistency of upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM operator that is CHAINED with AWOP: network input -> OperatorA -> AWOP -> network output OperatorA's state can become corrupted/inconsistent if you allow for checkpoint to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And side note, it's not only a problem of {{AWOP}} but any operator that wants to {{.yield()}}. Operators can only yield to downstream, which also means checkpoints can not be executed, and it affects also firing timers. I have a WIP FLIP-443 that will introduce a neat trick to allow operators to return the execution back to {{StreamTask}} so that checkpoint can happen, but AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to {{.yield()}}, it means it has no more space in the buffer to store any more records. So we can not checkpoint {{AWOP}} until some buffer space will become available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer space becomes available, return from the {{#processElement()}} call, and then it needs to relay on a fix for FLINK-35051 for stream task to prioritise performing a checkpoint over executing more records/mails. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836037#comment-17836037 ] Piotr Nowojski commented on FLINK-34704: {code:java} So it is designed to be safe to checkpoint during waiting async results (during yield). This operator is the only one that allow for checkpoint in middle of waiting. {code} Yes and no. It can be checkpointed with in-flight requests, but it can not be checkpointed while it is calling {{.yield()}} anywhere in it's own code, due to the state corruption/inconsistency issue I described above. So the solution is not to let subtask checkpoint while {{AsyncWaitOperator}} is calling {{.yield()}}, but {{StreamTask}} should prioritize taking a checkpoint over processing {{AWOPs}} mails that are already enqueued. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796 ] Piotr Nowojski edited comment on FLINK-34704 at 4/10/24 3:46 PM: - Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint while operator is yielding, since that can lead to inconsistent state for upstream chained operators. For example when upstream operator does this: {code} OperatorXYZ::processElement(e, output) { stateA = e.foo; output.collect(e); stateB = e.bar; } {code} Downstream chained operator inside {{output.collect(e)}} call, can not allow for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} in an inconsistent state. I've discovered this issue independently (but for other operators) and described the problem in FLINK-35051. I think a better solution would be to make {{StreamTask}} prioritise: * processing priority messages from network stack over processing mails * execute out of order "urgent" mails, like time outing aligned checkpoint barriers to unaligned ones or triggerCheckpoint RPCs for source tasks I would propose to close this ticket for just {{AsyncWaitOperator}} in favour of FLINK-35051. was (Author: pnowojski): Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint while operator is yielding, since that can lead to inconsistent state for upstream operators. For example when upstream operator does this: {code} OperatorXYZ::processElement(e, output) { stateA = e.foo; output.collect(e); stateB = e.bar; } {code} Chained operator inside `output.collect(e)` call, can not allow for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} in an inconsistent state. I've discovered this issue independently (but for other operators) and described the problem in FLINK-35051. I think a better solution would be to make {{StreamTask}} prioritise: * processing priority messages from network stack over processing mails * execute out of order "urgent" mails, like time outing aligned checkpoint barriers to unaligned ones or triggerCheckpoint RPCs for source tasks I would propose to close this ticket for just {{AsyncWaitOperator}} in favour of FLINK-35051. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full
[ https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796 ] Piotr Nowojski commented on FLINK-34704: Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint while operator is yielding, since that can lead to inconsistent state for upstream operators. For example when upstream operator does this: {code} OperatorXYZ::processElement(e, output) { stateA = e.foo; output.collect(e); stateB = e.bar; } {code} Chained operator inside `output.collect(e)` call, can not allow for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} in an inconsistent state. I've discovered this issue independently (but for other operators) and described the problem in FLINK-35051. I think a better solution would be to make {{StreamTask}} prioritise: * processing priority messages from network stack over processing mails * execute out of order "urgent" mails, like time outing aligned checkpoint barriers to unaligned ones or triggerCheckpoint RPCs for source tasks I would propose to close this ticket for just {{AsyncWaitOperator}} in favour of FLINK-35051. > Process checkpoint barrier in AsyncWaitOperator when the element queue is full > -- > > Key: FLINK-34704 > URL: https://issues.apache.org/jira/browse/FLINK-34704 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Zakelly Lan >Priority: Minor > > As discussed in > https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it > is better to provide such a new `yield` that can process mail with low > priority in the mailbox executor. More discussion needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35065) Add numFiredTimers and numFiredTimersPerSecond metrics
[ https://issues.apache.org/jira/browse/FLINK-35065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-35065. -- Resolution: Fixed Merged to master as 98bd2659f5b > Add numFiredTimers and numFiredTimersPerSecond metrics > -- > > Key: FLINK-35065 > URL: https://issues.apache.org/jira/browse/FLINK-35065 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > Currently there is now way of knowing how many timers are being fired by > Flink, so it's impossible to distinguish, even using code profiling, if > operator is firing only a couple of heavy timers per second using ~100% of > the CPU time, vs firing thousands of timer per seconds. > We could add the following metrics to address this issue: > * numFiredTimers - total number of fired timers per operator > * numFiredTimersPerSecond - per second rate of firing timers per operator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35065) Add numFiredTimers and numFiredTimersPerSecond metrics
Piotr Nowojski created FLINK-35065: -- Summary: Add numFiredTimers and numFiredTimersPerSecond metrics Key: FLINK-35065 URL: https://issues.apache.org/jira/browse/FLINK-35065 Project: Flink Issue Type: Improvement Components: Runtime / Metrics, Runtime / Task Affects Versions: 1.19.0 Reporter: Piotr Nowojski Assignee: Piotr Nowojski Fix For: 1.20.0 Currently there is now way of knowing how many timers are being fired by Flink, so it's impossible to distinguish, even using code profiling, if operator is firing only a couple of heavy timers per second using ~100% of the CPU time, vs firing thousands of timer per seconds. We could add the following metrics to address this issue: * numFiredTimers - total number of fired timers per operator * numFiredTimersPerSecond - per second rate of firing timers per operator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35051: --- Affects Version/s: 1.16.3 > Weird priorities when processing unaligned checkpoints > -- > > Key: FLINK-35051 > URL: https://issues.apache.org/jira/browse/FLINK-35051 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Network, Runtime / > Task >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Priority: Major > > While looking through the code I noticed that `StreamTask` is processing > unaligned checkpoints in strange order/priority. The end result is that > unaligned checkpoint `Start Delay` / triggering checkpoints in `StreamTask` > can be unnecessary delayed by other mailbox actions in the system, like for > example: > * processing time timers > * `AsyncWaitOperator` results > * ... > Incoming UC barrier is treated as a priority event by the network stack (it > will be polled from the input before anything else). This is what we want, > but polling elements from network stack has lower priority then processing > enqueued mailbox actions. > Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but > this mailbox action is also not prioritised in any way, so other mailbox > actions could be unnecessarily executed first. > On top of that there is a clash of two separate concepts here: > # Mailbox priority. yieldToDownstream - so in a sense reverse to what we > would like to have for triggering checkpoint, but that only kicks in #yield() > calls, where it's actually correct, that operator in a middle of execution > can not yield to checkpoint - it should only yield to downstream. > # Control mails in mailbox executor - cancellation is done via that, it > bypasses whole mailbox queue. > # Priority events in the network stack. > It's unfortunate that 1. vs 3. has a naming clash, as priority name is used > in both things, and highest network priority event containing UC barrier, > when executed via mailbox has actually the lowest mailbox priority. > Control mails mechanism is a kind of priority mails executed out of order, > but doesn't generalise well for use in checkpointing. > This whole thing should be re-worked at some point. Ideally what we would > like have is that: > * mail to convert AC barriers to UC > * polling UC barrier from the network input > * checkpoint trigger via RPC for source tasks > should be processed first, with an exception of yieldToDownstream, where > current mailbox priorities should be adhered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35051: --- Description: While looking through the code I noticed that `StreamTask` is processing unaligned checkpoints in strange order/priority. The end result is that unaligned checkpoint `Start Delay` / triggering checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions in the system, like for example: * processing time timers * `AsyncWaitOperator` results * ... Incoming UC barrier is treated as a priority event by the network stack (it will be polled from the input before anything else). This is what we want, but polling elements from network stack has lower priority then processing enqueued mailbox actions. Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but this mailbox action is also not prioritised in any way, so other mailbox actions could be unnecessarily executed first. On top of that there is a clash of two separate concepts here: # Mailbox priority. yieldToDownstream - so in a sense reverse to what we would like to have for triggering checkpoint, but that only kicks in #yield() calls, where it's actually correct, that operator in a middle of execution can not yield to checkpoint - it should only yield to downstream. # Control mails in mailbox executor - cancellation is done via that, it bypasses whole mailbox queue. # Priority events in the network stack. It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in both things, and highest network priority event containing UC barrier, when executed via mailbox has actually the lowest mailbox priority. Control mails mechanism is a kind of priority mails executed out of order, but doesn't generalise well for use in checkpointing. This whole thing should be re-worked at some point. Ideally what we would like have is that: * mail to convert AC barriers to UC * polling UC barrier from the network input * checkpoint trigger via RPC for source tasks should be processed first, with an exception of yieldToDownstream, where current mailbox priorities should be adhered. was: While looking through the code I noticed that `StreamTask` is processing unaligned checkpoints in strange order/priority. The end result is that unaligned checkpoint `Start Delay` time can be increased, and triggering checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions in the system, like for example: * processing time timers * `AsyncWaitOperator` results * ... Incoming UC barrier is treated as a priority event by the network stack (it will be polled from the input before anything else). This is what we want, but polling elements from network stack has lower priority then processing enqueued mailbox actions. Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but this mailbox action is also not prioritised in any way, so other mailbox actions could be unnecessarily executed first. On top of that there is a clash of two separate concepts here: # Mailbox priority. yieldToDownstream - so in a sense reverse to what we would like to have for triggering checkpoint, but that only kicks in #yield() calls, where it's actually correct, that operator in a middle of execution can not yield to checkpoint - it should only yield to downstream. # Control mails in mailbox executor - cancellation is done via that, it bypasses whole mailbox queue. # Priority events in the network stack. It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in both things, and highest network priority event containing UC barrier, when executed via mailbox has actually the lowest mailbox priority. Control mails mechanism is a kind of priority mails executed out of order, but doesn't generalise well for use in checkpointing. This whole thing should be re-worked at some point. Ideally what we would like have is that: * mail to convert AC barriers to UC * polling UC barrier from the network input * checkpoint trigger via RPC for source tasks should be processed first, with an exception of yieldToDownstream, where current mailbox priorities should be adhered. > Weird priorities when processing unaligned checkpoints > -- > > Key: FLINK-35051 > URL: https://issues.apache.org/jira/browse/FLINK-35051 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Network, Runtime / > Task >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Priority: Major > > While looking through the code I noticed that `StreamTask` is processing > unaligned checkpoints in strange order/priority. The end result is that > unaligned checkpoint `Start Delay` / triggering
[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints
[ https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-35051: --- Description: While looking through the code I noticed that `StreamTask` is processing unaligned checkpoints in strange order/priority. The end result is that unaligned checkpoint `Start Delay` time can be increased, and triggering checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions in the system, like for example: * processing time timers * `AsyncWaitOperator` results * ... Incoming UC barrier is treated as a priority event by the network stack (it will be polled from the input before anything else). This is what we want, but polling elements from network stack has lower priority then processing enqueued mailbox actions. Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but this mailbox action is also not prioritised in any way, so other mailbox actions could be unnecessarily executed first. On top of that there is a clash of two separate concepts here: # Mailbox priority. yieldToDownstream - so in a sense reverse to what we would like to have for triggering checkpoint, but that only kicks in #yield() calls, where it's actually correct, that operator in a middle of execution can not yield to checkpoint - it should only yield to downstream. # Control mails in mailbox executor - cancellation is done via that, it bypasses whole mailbox queue. # Priority events in the network stack. It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in both things, and highest network priority event containing UC barrier, when executed via mailbox has actually the lowest mailbox priority. Control mails mechanism is a kind of priority mails executed out of order, but doesn't generalise well for use in checkpointing. This whole thing should be re-worked at some point. Ideally what we would like have is that: * mail to convert AC barriers to UC * polling UC barrier from the network input * checkpoint trigger via RPC for source tasks should be processed first, with an exception of yieldToDownstream, where current mailbox priorities should be adhered. was: While looking through the code I noticed that `StreamTask` is processing unaligned checkpoints in strange order/priority. The end result is that unaligned checkpoint `Start Delay` time can be increased, and triggering checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions in the system, like for example: * processing time timers * `AsyncWaitOperator` results * ... Incoming UC barrier is treated as a priority event by the network stack (it will be polled from the input before anything else). This is what we want, but polling elements from network stack has lower priority then processing enqueued mailbox actions. Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but this mailbox action is also not prioritised in any way, so other mailbox actions could be unnecessarily executed first. On top of that there is a clash of two separate concepts here: # Mailbox priority. yieldToDownstream - so in a sense reverse to what we would like to have for triggering checkpoint, but that only kicks in #yield() calls, where it's actually correct, that operator in a middle of execution can not yield to checkpoint - it should only yield to downstream. # Control mails in mailbox executor - cancellation is done via that, it bypasses whole mailbox queue. # Priority events in the network stack. It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in both things, and highest network priority event containing UC barrier, when executed via mailbox has actually the lowest mailbox priority. Control mails mechanism is a kind of priority mails executed out of order, but doesn't generalise well for use in checkpointing. This whole thing should be re-worked at some point. Ideally what we would like have is that: * mail to convert AC barriers to UC * polling UC barrier from the network input * checkpoint trigger via RPC for source tasks should be processed first, with an exception of yieldToDownstream, where current mailbox priorities should be adhered. > Weird priorities when processing unaligned checkpoints > -- > > Key: FLINK-35051 > URL: https://issues.apache.org/jira/browse/FLINK-35051 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Network, Runtime / > Task >Affects Versions: 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Priority: Major > > While looking through the code I noticed that `StreamTask` is processing > unaligned checkpoints in strange order/priority. The end result is that > unaligned checkpoint `Start
[jira] [Created] (FLINK-35051) Weird priorities when processing unaligned checkpoints
Piotr Nowojski created FLINK-35051: -- Summary: Weird priorities when processing unaligned checkpoints Key: FLINK-35051 URL: https://issues.apache.org/jira/browse/FLINK-35051 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Network, Runtime / Task Affects Versions: 1.18.1, 1.19.0, 1.17.2 Reporter: Piotr Nowojski While looking through the code I noticed that `StreamTask` is processing unaligned checkpoints in strange order/priority. The end result is that unaligned checkpoint `Start Delay` time can be increased, and triggering checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions in the system, like for example: * processing time timers * `AsyncWaitOperator` results * ... Incoming UC barrier is treated as a priority event by the network stack (it will be polled from the input before anything else). This is what we want, but polling elements from network stack has lower priority then processing enqueued mailbox actions. Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but this mailbox action is also not prioritised in any way, so other mailbox actions could be unnecessarily executed first. On top of that there is a clash of two separate concepts here: # Mailbox priority. yieldToDownstream - so in a sense reverse to what we would like to have for triggering checkpoint, but that only kicks in #yield() calls, where it's actually correct, that operator in a middle of execution can not yield to checkpoint - it should only yield to downstream. # Control mails in mailbox executor - cancellation is done via that, it bypasses whole mailbox queue. # Priority events in the network stack. It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in both things, and highest network priority event containing UC barrier, when executed via mailbox has actually the lowest mailbox priority. Control mails mechanism is a kind of priority mails executed out of order, but doesn't generalise well for use in checkpointing. This whole thing should be re-worked at some point. Ideally what we would like have is that: * mail to convert AC barriers to UC * polling UC barrier from the network input * checkpoint trigger via RPC for source tasks should be processed first, with an exception of yieldToDownstream, where current mailbox priorities should be adhered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32957) Add current timer trigger lag to metrics
[ https://issues.apache.org/jira/browse/FLINK-32957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833184#comment-17833184 ] Piotr Nowojski commented on FLINK-32957: {{mailboxLatencyMs}} shows basically the same thing AFAIK. That is sampled time how long things are waiting in the mailbox queue before being executed, and timers are fired via the mailbox. > Add current timer trigger lag to metrics > > > Key: FLINK-32957 > URL: https://issues.apache.org/jira/browse/FLINK-32957 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / State Backends >Reporter: Rui Xia >Priority: Minor > > Timer trigger lag denotes the gap between the actual trigger timestamp and > the expected trigger timestamp (registered timestamp to `TimeService`). This > metric can aid users to find out whether there is a backlog of timers. > The backlog of timers may affect downstream data processing. Users customize > the trigger logic, which may interact with downstream data processing. For > example, a trigger logic can inject some records to downstream operators. The > backlog of timers blocks the record injection. > On the other side, The backlog of timers makes jobs unstable. Timers are used > by window operators, which leverage a timer to remove the window state of a > triggered window. The backlog of timers blocks data removal, and the state > size may grow unexpectedly large. The large state size affects the > performance of state-backend. In cloud-native environment, a k8s pod is prone > to reach local disk limit due to large state files (RocksDB SST). > Currently, users are hard to observe the backlog of timers. As far as I > known, heap dump is the only way to learn the backlog of timers. Thus, users > cannot notice the backlog of timers in time. FLINK-32954 > (https://issues.apache.org/jira/browse/FLINK-32954) exposes number of heap > timers, but is not suitable for RocksDB timer due to performance loss. > Compare with FLINK-32954, timer trigger lag is much more lightweight for > RocksDB timer. > * Reason 1: Timer trigger lag does not affect timer registering. > * Reason 2: The effect on timer triggering is limited. Timer registering is > a hot code-path, while timer triggering is much colder. In general, the > trigger interval is tens of second, and the timer trigger code-path is > invoked every tens of second. Thus, the addition of timer trigger lag > calculation has little performance overhead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830957#comment-17830957 ] Piotr Nowojski edited comment on FLINK-26515 at 3/26/24 2:31 PM: - occurance of the same issue in a private build with the same stack traces as https://issues.apache.org/jira/browse/FLINK-26515?focusedCommentId=17760292=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17760292 was (Author: pnowojski): occurance of the same issue in a private build > RetryingExecutorTest. testDiscardOnTimeout failed on azure > -- > > Key: FLINK-26515 > URL: https://issues.apache.org/jira/browse/FLINK-26515 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0 >Reporter: Yun Gao >Priority: Blocker > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 1.941 s <<< FAILURE! - in > org.apache.flink.changelog.fs.RetryingExecutorTest > Mar 06 01:20:29 [ERROR] testTimeout Time elapsed: 1.934 s <<< FAILURE! > Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but > was:<1922.869766> > Mar 06 01:20:29 at org.junit.Assert.fail(Assert.java:89) > Mar 06 01:20:29 at org.junit.Assert.failNotEquals(Assert.java:835) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:555) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:685) > Mar 06 01:20:29 at > org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145) > Mar 06 01:20:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 01:20:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 01:20:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 01:20:29 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Mar 06 01:20:29 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Mar 06 01:20:29 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Mar 06 01:20:29 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > Mar 06 01:20:29 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 06 01:20:29 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 06 01:20:29 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > Mar 06 01:20:29 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554 --
[jira] [Updated] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-26515: --- Priority: Blocker (was: Critical) > RetryingExecutorTest. testDiscardOnTimeout failed on azure > -- > > Key: FLINK-26515 > URL: https://issues.apache.org/jira/browse/FLINK-26515 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0 >Reporter: Yun Gao >Priority: Blocker > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 1.941 s <<< FAILURE! - in > org.apache.flink.changelog.fs.RetryingExecutorTest > Mar 06 01:20:29 [ERROR] testTimeout Time elapsed: 1.934 s <<< FAILURE! > Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but > was:<1922.869766> > Mar 06 01:20:29 at org.junit.Assert.fail(Assert.java:89) > Mar 06 01:20:29 at org.junit.Assert.failNotEquals(Assert.java:835) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:555) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:685) > Mar 06 01:20:29 at > org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145) > Mar 06 01:20:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 01:20:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 01:20:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 01:20:29 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Mar 06 01:20:29 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Mar 06 01:20:29 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Mar 06 01:20:29 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > Mar 06 01:20:29 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 06 01:20:29 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 06 01:20:29 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > Mar 06 01:20:29 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure
[ https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830957#comment-17830957 ] Piotr Nowojski commented on FLINK-26515: occurance of the same issue in a private build > RetryingExecutorTest. testDiscardOnTimeout failed on azure > -- > > Key: FLINK-26515 > URL: https://issues.apache.org/jira/browse/FLINK-26515 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0 >Reporter: Yun Gao >Priority: Critical > Labels: auto-deprioritized-major, pull-request-available, > test-stability > > {code:java} > Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 1.941 s <<< FAILURE! - in > org.apache.flink.changelog.fs.RetryingExecutorTest > Mar 06 01:20:29 [ERROR] testTimeout Time elapsed: 1.934 s <<< FAILURE! > Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but > was:<1922.869766> > Mar 06 01:20:29 at org.junit.Assert.fail(Assert.java:89) > Mar 06 01:20:29 at org.junit.Assert.failNotEquals(Assert.java:835) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:555) > Mar 06 01:20:29 at org.junit.Assert.assertEquals(Assert.java:685) > Mar 06 01:20:29 at > org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145) > Mar 06 01:20:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Mar 06 01:20:29 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Mar 06 01:20:29 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Mar 06 01:20:29 at java.lang.reflect.Method.invoke(Method.java:498) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Mar 06 01:20:29 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Mar 06 01:20:29 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Mar 06 01:20:29 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > Mar 06 01:20:29 at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > Mar 06 01:20:29 at > org.junit.runners.ParentRunner.run(ParentRunner.java:413) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > Mar 06 01:20:29 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > Mar 06 01:20:29 at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43) > Mar 06 01:20:29 at > java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > Mar 06 01:20:29 at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > Mar 06 01:20:29 at > java.util.Iterator.forEachRemaining(Iterator.java:116) > Mar 06 01:20:29 at > java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) > Mar 06 01:20:29 at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) > {code} > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34913: --- Fix Version/s: 2.0.0 (was: 1.19.1) > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > The following failures can occur during job's recovery when using clip & > ingest. This code path is currently not available, so the bug can not happen > for users. > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) > {noformat} > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830419#comment-17830419 ] Piotr Nowojski commented on FLINK-34913: merged commit 1a5ca7f into apache:master > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.19.1 > > > The following failures can occur during job's recovery when using clip & > ingest. This code path is currently not available, so the bug can not happen > for users. > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) > {noformat} > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830419#comment-17830419 ] Piotr Nowojski edited comment on FLINK-34913 at 3/25/24 10:01 AM: -- merged commit 1a5ca7f into apache:master The change was not backported to 1.19 as the code is disabled in that branch. was (Author: pnowojski): merged commit 1a5ca7f into apache:master > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > The following failures can occur during job's recovery when using clip & > ingest. This code path is currently not available, so the bug can not happen > for users. > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) > {noformat} > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34913: --- Description: The following failures can occur during job's recovery when using clip & ingest. This code path is currently not available, so the bug can not happen for users. {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) {noformat} {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} was: The following failures can occur during job's recovery when using clip & ingest {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) {noformat} {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.1 > > > The following failures can occur during job's recovery when using clip & > ingest. This code path is currently not available, so the bug can not happen > for users. > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at >
[jira] [Assigned] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-34913: -- Assignee: Piotr Nowojski > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.1 > > > The following failures can occur during job's recovery when using clip & > ingest > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) > at > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) > at > java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} > {noformat} > java.util.ConcurrentModificationException > at java.base/java.util.HashMap.compute(HashMap.java:1230) > at > org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
[ https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34913: --- Description: The following failures can occur during job's recovery when using clip & ingest {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) {noformat} {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} was: The following failures can occur during job's recovery when using clip & ingest {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} > ConcurrentModificationException > SubTaskInitializationMetricsBuilder.addDurationMetric > - > > Key: FLINK-34913 > URL: https://issues.apache.org/jira/browse/FLINK-34913 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics, Runtime / State Backends >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.1 > > > The following failures can occur
[jira] [Created] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric
Piotr Nowojski created FLINK-34913: -- Summary: ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric Key: FLINK-34913 URL: https://issues.apache.org/jira/browse/FLINK-34913 Project: Flink Issue Type: Bug Components: Runtime / Metrics, Runtime / State Backends Affects Versions: 1.19.0 Reporter: Piotr Nowojski Fix For: 1.19.1 The following failures can occur during job's recovery when using clip & ingest {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988) at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} {noformat} java.util.ConcurrentModificationException at java.base/java.util.HashMap.compute(HashMap.java:1230) at org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs
[ https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34696: --- Issue Type: Improvement (was: Bug) > GSRecoverableWriterCommitter is generating excessive data blobs > --- > > Key: FLINK-34696 > URL: https://issues.apache.org/jira/browse/FLINK-34696 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Reporter: Simon-Shlomo Poil >Priority: Major > > The `composeBlobs` method in > `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to > merge multiple small blobs into a single large blob using Google Cloud > Storage's compose method. This process is iterative, combining the result > from the previous iteration with 31 new blobs until all blobs are merged. > Upon completion of the composition, the method proceeds to remove the > temporary blobs. > *Issue:* > This methodology results in significant, unnecessary data storage consumption > during the blob composition process, incurring considerable costs due to > Google Cloud Storage pricing models. > *Example to Illustrate the Problem:* > - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB). > - After 1st step: 32 blobs are merged into a single blob, increasing total > storage to 96 GB (64 original + 32 GB new). > - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, > raising the total to 159 GB. > - After 3rd step: The final blob is merged, culminating in a total of 223 GB > to combine the original 64 GB of data. This results in an overhead of 159 GB. > *Impact:* > This inefficiency has a profound impact, especially at scale, where terabytes > of data can incur overheads in the petabyte range, leading to unexpectedly > high costs. Additionally, we have observed an increase in storage exceptions > thrown by the Google Storage library, potentially linked to this issue. > *Suggested Solution:* > To mitigate this problem, we propose modifying the `composeBlobs` method to > immediately delete source blobs once they have been successfully combined. > This change could significantly reduce data duplication and associated costs. > However, the implications for data recovery and integrity need careful > consideration to ensure that this optimization does not compromise the > ability to recover data in case of a failure during the composition process. > *Steps to Reproduce:* > 1. Initiate the blob composition process in an environment with a significant > number of blobs (e.g., 64 blobs of 1 GB each). > 2. Observe the temporary increase in data storage as blobs are iteratively > combined. > 3. Note the final amount of data storage used compared to the initial total > size of the blobs. > *Expected Behavior:* > The blob composition process should minimize unnecessary data storage use, > efficiently managing resources to combine blobs without generating excessive > temporary data overhead. > *Actual Behavior:* > The current implementation results in significant temporary increases in data > storage, leading to high costs and potential system instability due to > frequent storage exceptions. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed
[ https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33816. -- Fix Version/s: 2.0.0 Assignee: jiabao.sun Resolution: Fixed merged commit 5aebb04 into apache:master > SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due > async checkpoint triggering not being completed > - > > Key: FLINK-33816 > URL: https://issues.apache.org/jira/browse/FLINK-33816 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: jiabao.sun >Priority: Major > Labels: github-actions, pull-request-available, test-stability > Fix For: 2.0.0 > > Attachments: screenshot-1.png > > > [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430] > {code:java} > rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest > 9426Error: 14:39:01 14:39:01.930 [ERROR] > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain > Time elapsed: 0.034 s <<< FAILURE! > 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: > 9428Dec 12 14:39:01 > 9429Dec 12 14:39:01 Expecting value to be true but was false > 9430Dec 12 14:39:01 at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > 9431Dec 12 14:39:01 at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > 9432Dec 12 14:39:01 at > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710) > 9433Dec 12 14:39:01 at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > 9434Dec 12 14:39:01 at > java.base/java.lang.reflect.Method.invoke(Method.java:580) > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed
[ https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827503#comment-17827503 ] Piotr Nowojski edited comment on FLINK-33816 at 3/15/24 1:33 PM: - Thanks for the fix! merged commit 5aebb04 into apache:master was (Author: pnowojski): merged commit 5aebb04 into apache:master > SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due > async checkpoint triggering not being completed > - > > Key: FLINK-33816 > URL: https://issues.apache.org/jira/browse/FLINK-33816 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: jiabao.sun >Priority: Major > Labels: github-actions, pull-request-available, test-stability > Fix For: 2.0.0 > > Attachments: screenshot-1.png > > > [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430] > {code:java} > rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, > Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest > 9426Error: 14:39:01 14:39:01.930 [ERROR] > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain > Time elapsed: 0.034 s <<< FAILURE! > 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: > 9428Dec 12 14:39:01 > 9429Dec 12 14:39:01 Expecting value to be true but was false > 9430Dec 12 14:39:01 at > java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62) > 9431Dec 12 14:39:01 at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502) > 9432Dec 12 14:39:01 at > org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710) > 9433Dec 12 14:39:01 at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > 9434Dec 12 14:39:01 at > java.base/java.lang.reflect.Method.invoke(Method.java:580) > [...] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed
[ https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826779#comment-17826779 ] Piotr Nowojski commented on FLINK-26990: I've just encountered the same issue in some internal build (thus can not provide an url, but the error message is exactly the same) > BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > failed > --- > > Key: FLINK-26990 > URL: https://issues.apache.org/jira/browse/FLINK-26990 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.2 >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899] > failed due to unexpected behavior in > {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}: > {code} > Apr 01 11:42:06 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > Time elapsed: 0.11 s <<< FAILURE! > Apr 01 11:42:06 java.lang.AssertionError: > Apr 01 11:42:06 > Apr 01 11:42:06 Expected size: 6 but was: 4 in: > Apr 01 11:42:06 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed
[ https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-26990: --- Priority: Major (was: Minor) > BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > failed > --- > > Key: FLINK-26990 > URL: https://issues.apache.org/jira/browse/FLINK-26990 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.2, 1.18.1 >Reporter: Matthias Pohl >Priority: Major > Labels: auto-deprioritized-major, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899] > failed due to unexpected behavior in > {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}: > {code} > Apr 01 11:42:06 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > Time elapsed: 0.11 s <<< FAILURE! > Apr 01 11:42:06 java.lang.AssertionError: > Apr 01 11:42:06 > Apr 01 11:42:06 Expected size: 6 but was: 4 in: > Apr 01 11:42:06 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed
[ https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-26990: --- Affects Version/s: 1.18.1 > BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > failed > --- > > Key: FLINK-26990 > URL: https://issues.apache.org/jira/browse/FLINK-26990 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.2, 1.18.1 >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-major, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899] > failed due to unexpected behavior in > {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}: > {code} > Apr 01 11:42:06 [ERROR] > org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime > Time elapsed: 0.11 s <<< FAILURE! > Apr 01 11:42:06 java.lang.AssertionError: > Apr 01 11:42:06 > Apr 01 11:42:06 Expected size: 6 but was: 4 in: > Apr 01 11:42:06 [Record @ (undef) : > +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15), > Apr 01 11:42:06 Record @ (undef) : > +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch
[ https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-29114: --- Priority: Blocker (was: Major) > TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with > result mismatch > -- > > Key: FLINK-29114 > URL: https://issues.apache.org/jira/browse/FLINK-29114 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Tests >Affects Versions: 1.15.0, 1.19.0, 1.20.0 >Reporter: Sergey Nuyanzin >Assignee: Jane Chan >Priority: Blocker > Labels: auto-deprioritized-major, pull-request-available, > test-stability > Fix For: 1.20.0 > > Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, > image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png > > > It could be reproduced locally by repeating tests. Usually about 100 > iterations are enough to have several failed tests > {noformat} > [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.664 s <<< FAILURE! - in > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase > [ERROR] > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse > Time elapsed: 0.108 s <<< FAILURE! > java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:120) > at org.junit.Assert.assertEquals(Assert.java:146) > at > org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) > at > org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) > at > org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at >
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820619#comment-17820619 ] Piotr Nowojski commented on FLINK-33856: Hey [~hejufang001]! The new processes hasn't been yet documented, but it will be basically the same, the only difference being that for initial discussion stage FLIPs should be published not on wiki but via Google Docs (with public read access, disabled comments). Before/after voting a committer will be copy pasting it to wiki. So if you are still willing to work on this, feel free to start the discussion thread using Google Document :) > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
[ https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818838#comment-17818838 ] Piotr Nowojski commented on FLINK-34430: [~roman] is already working on that under FLINK-26050 ticket. However the akka frame size is still an issue. The 10MB limit is dangerously small, especially when the {{ByteStreamHandle}} size limit is bumped to 1MB (which is a pretty reasonable value all things consider). In that case the limit would exceeded with only 10+ files. > Akka frame size exceeded with many ByteStreamStateHandle being used > --- > > Key: FLINK-34430 > URL: https://issues.apache.org/jira/browse/FLINK-34430 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Major > > The following error can happen > {noformat} > Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > > error.stack_trace > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > {noformat} > when https://issues.apache.org/jira/browse/FLINK-26050 is causing large > amount of small sst files to be created and never deleted. If those files are > small enough to be handled by {{ByteStreamStateHandle}} akka frame size can > be exceeded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34386) Add RocksDB bloom filter metrics
[ https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818837#comment-17818837 ] Piotr Nowojski edited comment on FLINK-34386 at 2/20/24 2:42 PM: - merged commit 890a995 into apache:master now Thanks [~hejufang001]! was (Author: pnowojski): merged commit 890a995 into apache:master now > Add RocksDB bloom filter metrics > > > Key: FLINK-34386 > URL: https://issues.apache.org/jira/browse/FLINK-34386 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > In our production environment, with RocksDB bloom filter enabled, the > performance optimization effect on task state reading is obvious. However, > there is a lack of usage metrics for bloom filter, If these Metrics are > reported via Metrics reporter, it is easy to monitor the effectiveness of > bloom filter optimization. > And these metrics are available from RocksDB Statistics: > BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads. > BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads. > BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the > reads and data actually exist. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34386) Add RocksDB bloom filter metrics
[ https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-34386. -- Fix Version/s: 1.20.0 Resolution: Fixed merged commit 890a995 into apache:master now > Add RocksDB bloom filter metrics > > > Key: FLINK-34386 > URL: https://issues.apache.org/jira/browse/FLINK-34386 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > Fix For: 1.20.0 > > > In our production environment, with RocksDB bloom filter enabled, the > performance optimization effect on task state reading is obvious. However, > there is a lack of usage metrics for bloom filter, If these Metrics are > reported via Metrics reporter, it is easy to monitor the effectiveness of > bloom filter optimization. > And these metrics are available from RocksDB Statistics: > BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads. > BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads. > BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the > reads and data actually exist. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34386) Add RocksDB bloom filter metrics
[ https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-34386: -- Assignee: Jufang He > Add RocksDB bloom filter metrics > > > Key: FLINK-34386 > URL: https://issues.apache.org/jira/browse/FLINK-34386 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > In our production environment, with RocksDB bloom filter enabled, the > performance optimization effect on task state reading is obvious. However, > there is a lack of usage metrics for bloom filter, If these Metrics are > reported via Metrics reporter, it is easy to monitor the effectiveness of > bloom filter optimization. > And these metrics are available from RocksDB Statistics: > BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads. > BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads. > BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the > reads and data actually exist. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817251#comment-17817251 ] Piotr Nowojski edited comment on FLINK-34424 at 2/14/24 8:39 AM: - :D [~mapohl] I haven't made any modifications to this area of code since a very long time and I unfortunately recently don't have much time to investigate this kind of issues. I would suggest to ping someone else. Maybe authors of the most recent change to this test file: FLINK-33743 [~yunfengzhou][~tanyuxin] was (Author: pnowojski): :D [~mapohl] I haven't made any modifications to this area of code since a very long time and I unfortunately recently don't have much time to investigate this kind of issues. I would suggest to ping someone else. Maybe authors of the most recent change to this test file: FLINK-33743 > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out
[ https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817251#comment-17817251 ] Piotr Nowojski commented on FLINK-34424: :D [~mapohl] I haven't made any modifications to this area of code since a very long time and I unfortunately recently don't have much time to investigate this kind of issues. I would suggest to ping someone else. Maybe authors of the most recent change to this test file: FLINK-33743 > BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times > out > > > Key: FLINK-34424 > URL: https://issues.apache.org/jira/browse/FLINK-34424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151 > {code} > Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 > tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000] > Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor) > Feb 11 13:55:29 at java.lang.Object.wait(Native Method) > Feb 11 13:55:29 at java.lang.Thread.join(Thread.java:1252) > Feb 11 13:55:29 - locked <0xe2e019a8> (a > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92) > Feb 11 13:55:29 at > org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81) > Feb 11 13:55:29 at > org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177) > Feb 11 13:55:29 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > [...] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
[ https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34430: --- Description: The following error can happen {noformat} Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. error.stack_trace akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. {noformat} when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount of small sst files to be created and never deleted. If those files are small enough to be handled by {{ByteStreamStateHandle}} akka frame size can be exceeded. was: The following error can happen {noformat} Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. error.stack_trace akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. {noformat} when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount of small sst files to be created and never deleted. If those files are small enough to be > Akka frame size exceeded with many ByteStreamStateHandle being used > --- > > Key: FLINK-34430 > URL: https://issues.apache.org/jira/browse/FLINK-34430 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Major > > The following error can happen > {noformat} > Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > > error.stack_trace > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > {noformat} > when https://issues.apache.org/jira/browse/FLINK-26050 is causing large > amount of small sst files to be created and never deleted. If those files are > small enough to be handled by {{ByteStreamStateHandle}} akka frame size can > be exceeded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
[ https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34430: --- Description: The following error can happen {noformat} Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. error.stack_trace akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. {noformat} when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount of small sst files to be created and never deleted. If those files are small enough to be was: The following error can happen {noformat} Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. error.stack_trace akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. {noformat} > Akka frame size exceeded with many ByteStreamStateHandle being used > --- > > Key: FLINK-34430 > URL: https://issues.apache.org/jira/browse/FLINK-34430 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Major > > The following error can happen > {noformat} > Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > > error.stack_trace > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > {noformat} > when https://issues.apache.org/jira/browse/FLINK-26050 is causing large > amount of small sst files to be created and never deleted. If those files are > small enough to be -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
[ https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34430: --- Description: The following error can happen {noformat} Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. error.stack_trace akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. {noformat} > Akka frame size exceeded with many ByteStreamStateHandle being used > --- > > Key: FLINK-34430 > URL: https://issues.apache.org/jira/browse/FLINK-34430 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Major > > The following error can happen > {noformat} > Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > > error.stack_trace > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size > 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
[ https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816597#comment-17816597 ] Piotr Nowojski commented on FLINK-34430: Akka frame size can be exceeded as a result of RocksDB creating too many small files. > Akka frame size exceeded with many ByteStreamStateHandle being used > --- > > Key: FLINK-34430 > URL: https://issues.apache.org/jira/browse/FLINK-34430 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1 >Reporter: Piotr Nowojski >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used
Piotr Nowojski created FLINK-34430: -- Summary: Akka frame size exceeded with many ByteStreamStateHandle being used Key: FLINK-34430 URL: https://issues.apache.org/jira/browse/FLINK-34430 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.18.1, 1.17.2, 1.16.3, 1.19.0 Reporter: Piotr Nowojski -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order
[ https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816595#comment-17816595 ] Piotr Nowojski commented on FLINK-26050: I have encountered the same situation but with event time windows and the problem is AFAIK not only with state of the timers, but also state of the windows. In the case I've seen we had checkpointing configured once every minute with windows with length of 10 minutes and there were hundreds of files created both every minute and then small fraction of files created every ten minutes. On top of that this can cause also other problems, not only too many opened files. In our case it was Akka frame limit exceeded, as each of those files small enough to fit in the byte state handle, and akka crashed trying to send ~3000 of those handles from JM to TM. > Too many small sst files in rocksdb state backend when using time window > created in ascending order > --- > > Key: FLINK-26050 > URL: https://issues.apache.org/jira/browse/FLINK-26050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.2, 1.14.3 >Reporter: shen >Priority: Major > Attachments: image-2022-02-09-21-22-13-920.png, > image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, > image-2022-02-14-13-04-52-325.png > > > When using processing or event time windows, in some workloads, there will be > a lot of small sst files(serveral KB) in rocksdb local directory and may > cause "Too many files error". > Use rocksdb tool ldb to find out content in sst files: > * column family of these small sst files is "processing_window-timers". > * most sst files are in level-1. > * records in sst files are almost kTypeDeletion. > * creation time of sst file correspond to checkpoint interval. > These small sst files seem to be generated when flink checkpoint is > triggered. Although all content in sst are delete tags, they are not > compacted and deleted in rocksdb compaction because of not intersecting with > each other(rocksdb [compaction trivial > move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And > there seems to be no chance to delete them because of small size and not > intersect with other sst files. > > I will attach a simple program to reproduce the problem. > > Since timer in processing time window is generated in strictly ascending > order(both put and delete). So If workload of job happen to generate level-0 > sst files not intersect with each other.(for example: processing window size > much smaller than checkpoint interval, and no window content cross checkpoint > interval or no new data in window crossing checkpoint interval). There will > be many small sst files generated until job restored from savepoint, or > incremental checkpoint is disabled. > > May be similar problem exists when user use timer in operators with same > workload. > > Code to reproduce the problem: > {code:java} > package org.apache.flink.jira; > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.configuration.TaskManagerOptions; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import > org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.util.Collector; > import java.util.Collections; > import java.util.List; > import java.util.Random; > @Slf4j > public class StreamApp { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(RestOptions.ADDRESS, "127.0.0.1"); > config.set(RestOptions.PORT, 10086); > config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6); > new > StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, > config)); > } > public void configureApp(StreamExecutionEnvironment env) throws Exception { > env.enableCheckpointing(2); // 20sec > RocksDBStateBackend rocksDBStateBackend = > new >
[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order
[ https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-26050: --- Description: When using processing or event time windows, in some workloads, there will be a lot of small sst files(serveral KB) in rocksdb local directory and may cause "Too many files error". Use rocksdb tool ldb to find out content in sst files: * column family of these small sst files is "processing_window-timers". * most sst files are in level-1. * records in sst files are almost kTypeDeletion. * creation time of sst file correspond to checkpoint interval. These small sst files seem to be generated when flink checkpoint is triggered. Although all content in sst are delete tags, they are not compacted and deleted in rocksdb compaction because of not intersecting with each other(rocksdb [compaction trivial move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And there seems to be no chance to delete them because of small size and not intersect with other sst files. I will attach a simple program to reproduce the problem. Since timer in processing time window is generated in strictly ascending order(both put and delete). So If workload of job happen to generate level-0 sst files not intersect with each other.(for example: processing window size much smaller than checkpoint interval, and no window content cross checkpoint interval or no new data in window crossing checkpoint interval). There will be many small sst files generated until job restored from savepoint, or incremental checkpoint is disabled. May be similar problem exists when user use timer in operators with same workload. Code to reproduce the problem: {code:java} package org.apache.flink.jira; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Collections; import java.util.List; import java.util.Random; @Slf4j public class StreamApp { public static void main(String[] args) throws Exception { Configuration config = new Configuration(); config.set(RestOptions.ADDRESS, "127.0.0.1"); config.set(RestOptions.PORT, 10086); config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6); new StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, config)); } public void configureApp(StreamExecutionEnvironment env) throws Exception { env.enableCheckpointing(2); // 20sec RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/", true); // need to be reconfigured rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db"); // need to be reconfigured env.setStateBackend(rocksDBStateBackend); env.getCheckpointConfig().setCheckpointTimeout(10); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setTaskCancellationInterval(1); for (int i = 0; i < 1; ++i) { createOnePipeline(env); } env.execute("StreamApp"); } private void createOnePipeline(StreamExecutionEnvironment env) { // data source is configured so that little window cross checkpoint interval DataStreamSource stream = env.addSource(new Generator(1, 3000, 3600)); stream.keyBy(x -> x) // make sure window size less than checkpoint interval. though 100ms is too small, I think increase this value can still reproduce the problem with longer time. .window(TumblingProcessingTimeWindows.of(Time.milliseconds(100))) .process(new ProcessWindowFunction() { @Override public void process(String s, ProcessWindowFunction.Context context, Iterable elements, Collector out) { for (String ele: elements) { out.collect(ele); } } }).print(); } public static final class Generator
[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order
[ https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-26050: --- Summary: Too many small sst files in rocksdb state backend when using time window created in ascending order (was: Too many small sst files in rocksdb state backend when using processing time window) > Too many small sst files in rocksdb state backend when using time window > created in ascending order > --- > > Key: FLINK-26050 > URL: https://issues.apache.org/jira/browse/FLINK-26050 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.10.2, 1.14.3 >Reporter: shen >Priority: Major > Attachments: image-2022-02-09-21-22-13-920.png, > image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, > image-2022-02-14-13-04-52-325.png > > > When using processing time window, in some workload, there will be a lot of > small sst files(serveral KB) in rocksdb local directory and may cause "Too > many files error". > Use rocksdb tool ldb to find out content in sst files: > * column family of these small sst files is "processing_window-timers". > * most sst files are in level-1. > * records in sst files are almost kTypeDeletion. > * creation time of sst file correspond to checkpoint interval. > These small sst files seem to be generated when flink checkpoint is > triggered. Although all content in sst are delete tags, they are not > compacted and deleted in rocksdb compaction because of not intersecting with > each other(rocksdb [compaction trivial > move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And > there seems to be no chance to delete them because of small size and not > intersect with other sst files. > > I will attach a simple program to reproduce the problem. > > Since timer in processing time window is generated in strictly ascending > order(both put and delete). So If workload of job happen to generate level-0 > sst files not intersect with each other.(for example: processing window size > much smaller than checkpoint interval, and no window content cross checkpoint > interval or no new data in window crossing checkpoint interval). There will > be many small sst files generated until job restored from savepoint, or > incremental checkpoint is disabled. > > May be similar problem exists when user use timer in operators with same > workload. > > Code to reproduce the problem: > {code:java} > package org.apache.flink.jira; > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.configuration.RestOptions; > import org.apache.flink.configuration.TaskManagerOptions; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import > org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; > import > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.util.Collector; > import java.util.Collections; > import java.util.List; > import java.util.Random; > @Slf4j > public class StreamApp { > public static void main(String[] args) throws Exception { > Configuration config = new Configuration(); > config.set(RestOptions.ADDRESS, "127.0.0.1"); > config.set(RestOptions.PORT, 10086); > config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6); > new > StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, > config)); > } > public void configureApp(StreamExecutionEnvironment env) throws Exception { > env.enableCheckpointing(2); // 20sec > RocksDBStateBackend rocksDBStateBackend = > new > RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/", > true); // need to be reconfigured > > rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db"); > // need to be reconfigured > env.setStateBackend(rocksDBStateBackend); > env.getCheckpointConfig().setCheckpointTimeout(10); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >
[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815766#comment-17815766 ] Piotr Nowojski commented on FLINK-34289: That definitely would prevent my confusion :) > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png, > screenshot-2.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815685#comment-17815685 ] Piotr Nowojski commented on FLINK-34403: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57399=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461 > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Priority: Major > Labels: test-stability > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37) > Feb 07 05:43:21 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Self-suppression not permitted > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323) > Feb 07 05:43:21 ... 18 more > Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: > Self-suppression not permitted > Feb 07 05:43:21 at > java.lang.Throwable.addSuppressed(Throwable.java:1072) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:486) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:182) > Feb
[jira] [Updated] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
[ https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34403: --- Priority: Critical (was: Major) > VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM > - > > Key: FLINK-34403 > URL: https://issues.apache.org/jira/browse/FLINK-34403 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.20.0 >Reporter: Benchao Li >Priority: Critical > Labels: test-stability > > After FLINK-33611 merged, the misc test on GHA cannot pass due to out of > memory error, throwing following exceptions: > {code:java} > Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, > Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest > Error: 05:43:21 05:43:21.773 [ERROR] > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time > elapsed: 40.97 s <<< ERROR! > Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in > serialization. > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007) > Feb 07 05:43:21 at > org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) > Feb 07 05:43:21 at > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104) > Feb 07 05:43:21 at > org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71) > Feb 07 05:43:21 at > org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37) > Feb 07 05:43:21 at java.lang.reflect.Method.invoke(Method.java:498) > Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalArgumentException: Self-suppression not permitted > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323) > Feb 07 05:43:21 ... 18 more > Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: > Self-suppression not permitted > Feb 07 05:43:21 at > java.lang.Throwable.addSuppressed(Throwable.java:1072) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556) > Feb 07 05:43:21 at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:486) > Feb 07 05:43:21 at > org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:182) > Feb 07 05:43:21 at > java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) > Feb 07 05:43:21 at >
[jira] [Comment Edited] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815532#comment-17815532 ] Piotr Nowojski edited comment on FLINK-34289 at 2/8/24 6:33 AM: !Screenshot 2024-02-08 at 07.22.19.png|width=600! This part was confusing to me. My thought process was: # Ok first box says, I should create a *new* ticket. # But following the arrow, I should *remove* "Instructions". That's slightly suggested to rename the ticket. # The truly confusing part. The box on the right, shows an arrow from "X Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue first, supporting "create new ticket" interpretation. But I couldn't create sub-task. So maybe that arrow is supposed to represent action of renaming thet ticket? # I've checked that in FLINK-34285 someone else has already renamed ticket by removing "Instructions" so that further supported my interpretation that I should rename it as well. Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} and labeling the arrow between "Instructions" and non "Instructions" tickets, to state that for example it represents a linked ticket would help me avoid this mistake. was (Author: pnowojski): !Screenshot 2024-02-08 at 07.22.19.png! This part was confusing to me. My thought process was: # Ok first box says, I should create a *new* ticket. # But following the arrow, I should *remove* "Instructions". That's slightly suggested to rename the ticket. # The truly confusing part. The box on the right, shows an arrow from "X Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue first, supporting "create new ticket" interpretation. But I couldn't create sub-task. So maybe that arrow is supposed to represent action of renaming thet ticket? # I've checked that in FLINK-34285 someone else has already renamed ticket by removing "Instructions" so that further supported my interpretation that I should rename it as well. Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} and labeling the arrow between "Instructions" and non "Instructions" tickets, to state that for example it represents a linked ticket would help me avoid this mistake. > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815532#comment-17815532 ] Piotr Nowojski commented on FLINK-34289: !Screenshot 2024-02-08 at 07.22.19.png! This part was confusing to me. My thought process was: # Ok first box says, I should create a *new* ticket. # But following the arrow, I should *remove* "Instructions". That's slightly suggested to rename the ticket. # The truly confusing part. The box on the right, shows an arrow from "X Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue first, supporting "create new ticket" interpretation. But I couldn't create sub-task. So maybe that arrow is supposed to represent action of renaming thet ticket? # I've checked that in FLINK-34285 someone else has already renamed ticket by removing "Instructions" so that further supported my interpretation that I should rename it as well. Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} and labeling the arrow between "Instructions" and non "Instructions" tickets, to state that for example it represents a linked ticket would help me avoid this mistake. > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34289: --- Attachment: Screenshot 2024-02-08 at 07.22.19.png > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814807#comment-17814807 ] Piotr Nowojski commented on FLINK-34289: I wasn't complaining, just explaining myself :) Sorry for misunderstanding what I should have done! > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814787#comment-17814787 ] Piotr Nowojski commented on FLINK-34289: Thanks for cleaning this up. Indeed I was confused what to do and decided to follow an example from someone else. Anyway, after giving it a second thought, I think there is no need to cross team test this feature. I've closed FLINK-34387. > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34387) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-34387. -- Resolution: Won't Do After giving it a second thought I'm closing this ticket, as there is no big need to manually cross test this feature. > Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to > create checkpointing and recovery traces > --- > > Key: FLINK-34387 > URL: https://issues.apache.org/jira/browse/FLINK-34387 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: Piotr Nowojski >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34289) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-34289: -- Assignee: (was: Piotr Nowojski) > Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to > create checkpointing and recovery traces > --- > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34290) Release Testing Instructions: Verify FLINK-33696 Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
[ https://issues.apache.org/jira/browse/FLINK-34290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-34290. -- Resolution: Won't Do Covered by https://issues.apache.org/jira/browse/FLINK-34289 > Release Testing Instructions: Verify FLINK-33696 Add > OpenTelemetryTraceReporter and OpenTelemetryMetricReporter > --- > > Key: FLINK-34290 > URL: https://issues.apache.org/jira/browse/FLINK-34290 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34289) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34289: --- Summary: Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces (was: Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces ) > Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to > create checkpointing and recovery traces > --- > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34289: --- Description: This ticket covers testing three related features: FLINK-33695, FLINK-33735 and FLINK-33696. Instructions: # Configure Flink to use [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] and with enabled *INFO* level logging (can be to console or to a file, doesn't matter). # Start a streaming job with enabled checkpointing. # Let it run for a couple of checkpoints. # Verify presence of a single *JobInitialization* [1] trace logged just after job start up. # Verify presence of a couple of *Checkpoint* [1] traces logged after each successful or failed checkpoint. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > This ticket covers testing three related features: FLINK-33695, FLINK-33735 > and FLINK-33696. > Instructions: > # Configure Flink to use > [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j] > and with enabled *INFO* level logging (can be to console or to a file, > doesn't matter). > # Start a streaming job with enabled checkpointing. > # Let it run for a couple of checkpoints. > # Verify presence of a single *JobInitialization* [1] trace logged just after > job start up. > # Verify presence of a couple of *Checkpoint* [1] traces logged after each > successful or failed checkpoint. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814439#comment-17814439 ] Piotr Nowojski commented on FLINK-34289: Thanks! I will create a single ticket for release testing instructions to cover all 3 FLIPs. > Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and > use it to create checkpointing and recovery traces > > > Key: FLINK-34289 > URL: https://issues.apache.org/jira/browse/FLINK-34289 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Metrics >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: Piotr Nowojski >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813740#comment-17813740 ] Piotr Nowojski commented on FLINK-33819: Thanks [~mayuehappy]. {quote} merged 4f7725aa into master which is not blocked by the performance result. I also think it's worthy discussing and we could find a better default behavious in the next version. {quote} +1 Yes > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813740#comment-17813740 ] Piotr Nowojski edited comment on FLINK-33819 at 2/2/24 3:52 PM: Thanks [~mayuehappy]. {quote} merged 4f7725aa into master which is not blocked by the performance result. I also think it's worthy discussing and we could find a better default behavious in the next version. {quote} +1 was (Author: pnowojski): Thanks [~mayuehappy]. {quote} merged 4f7725aa into master which is not blocked by the performance result. I also think it's worthy discussing and we could find a better default behavious in the next version. {quote} +1 Yes > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
[ https://issues.apache.org/jira/browse/FLINK-33696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reopened FLINK-33696: No it's not yet done. I still have to open a PR adding OpenTelemetry reporters. 7db2ecad was adding only slf4j reporter. > FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter > > > Key: FLINK-33696 > URL: https://issues.apache.org/jira/browse/FLINK-33696 > Project: Flink > Issue Type: New Feature > Components: Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.0 > > > h1. Motivation > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces] > is adding TraceReporter interface. However with > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces] > alone, Log4jTraceReporter would be the only available implementation of > TraceReporter interface, which is not very helpful. > In this FLIP I’m proposing to contribute both MetricExporter and > TraceReporter implementation using OpenTelemetry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34134) Add tracing for restored state size and locations
[ https://issues.apache.org/jira/browse/FLINK-34134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-34134: --- Fix Version/s: 1.19.0 > Add tracing for restored state size and locations > - > > Key: FLINK-34134 > URL: https://issues.apache.org/jira/browse/FLINK-34134 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We can add tracing during the restore that reports the state size that was > restored by location(s). This is particularly interesting for a mixed > recovery with some local and some remote state. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-27756. -- Resolution: Fixed Thanks [~chalixar] for fixing this! merged commit d92ab39 into apache:master 2b6c656ff31 into release-1.18 ca62b0070cf into release-1.17 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.19.0, 1.18.1 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-27756: --- Fix Version/s: 1.17.3 1.18.2 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-27756: --- Affects Version/s: 1.18.1 > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.19.0, 1.18.1 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
[ https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-27756: --- Priority: Blocker (was: Critical) > Fix intermittently failing test in > AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds > -- > > Key: FLINK-27756 > URL: https://issues.apache.org/jira/browse/FLINK-27756 > Project: Flink > Issue Type: Bug > Components: Connectors / Common >Affects Versions: 1.17.0, 1.19.0 >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.19.0 > > > h2. Motivation > - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of > {{AsyncSinkWriterTest}} has been reported to fail intermittently on build > pipeline causing blocking of new changes. > - Reporting build is [linked > |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33338) Bump FRocksDB version
[ https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806860#comment-17806860 ] Piotr Nowojski commented on FLINK-8: [~roman], let's make sure that this latest bug fix is also included in our FRocksDB release: https://issues.apache.org/jira/browse/FLINK-7?focusedCommentId=17805364=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17805364 > Bump FRocksDB version > - > > Key: FLINK-8 > URL: https://issues.apache.org/jira/browse/FLINK-8 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Roman Khachatryan >Priority: Major > > We need to bump RocksDB in order to be able to use new IngestDB and ClipDB > commands. > If some of the required changes haven't been merged to Facebook/RocksDB, we > should cherry-pick and include them in our FRocksDB fork. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API
[ https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806861#comment-17806861 ] Piotr Nowojski commented on FLINK-7: Thanks [~mayuehappy] for the update! > Expose IngestDB and ClipDB in the official RocksDB API > -- > > Key: FLINK-7 > URL: https://issues.apache.org/jira/browse/FLINK-7 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Yue Ma >Priority: Major > Attachments: image-2024-01-11-12-03-14-308.png > > > Remaining open PRs: > None :) > Already merged PRs: > https://github.com/facebook/rocksdb/pull/11646 > https://github.com/facebook/rocksdb/pull/11868 > https://github.com/facebook/rocksdb/pull/11811 > https://github.com/facebook/rocksdb/pull/11381 > https://github.com/facebook/rocksdb/pull/11379 > https://github.com/facebook/rocksdb/pull/11378 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804 ] Piotr Nowojski edited comment on FLINK-33819 at 1/15/24 1:09 PM: - Thanks for drafting the proposal [~mayuehappy] +1 for making this option configurable. It would be great if someone could test out [~masteryhx]'s suggestion. Configuring no compression for L0 and L1 levels, while keeping lower levels compressed: * By how much the checkpoint would grow? * What about performance? It sounds like this actually could be the default behaviour for us? Would be great to test it out, but not a blocker from my side for implementing this ticket. was (Author: pnowojski): Thanks for drafting the proposal [~mayuehappy] +1 for making this option configurable. > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804 ] Piotr Nowojski commented on FLINK-33819: +1 for making this option configurable. > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend
[ https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804 ] Piotr Nowojski edited comment on FLINK-33819 at 1/15/24 1:00 PM: - Thanks for drafting the proposal [~mayuehappy] +1 for making this option configurable. was (Author: pnowojski): +1 for making this option configurable. > Support setting CompressType in RocksDBStateBackend > --- > > Key: FLINK-33819 > URL: https://issues.apache.org/jira/browse/FLINK-33819 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Yue Ma >Assignee: Yue Ma >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > Attachments: image-2023-12-14-11-32-32-968.png, > image-2023-12-14-11-35-22-306.png > > > Currently, RocksDBStateBackend does not support setting the compression > level, and Snappy is used for compression by default. But we have some > scenarios where compression will use a lot of CPU resources. Turning off > compression can significantly reduce CPU overhead. So we may need to support > a parameter for users to set the CompressType of Rocksdb. > !image-2023-12-14-11-35-22-306.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
[ https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805934#comment-17805934 ] Piotr Nowojski edited comment on FLINK-34063 at 1/12/24 8:32 AM: - [~dmvk] could this issue be discovered by our ITCases if snapshot compression had been enabled in them? Regardless of that, it is probably a good idea to enable it using our configuration randomisation framework: {{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}. was (Author: pnowojski): Could this issue be discovered by our ITCases if snapshot compression had been enabled in them? Regardless of that, it is probably a good idea to enable it using our configuration randomisation framework: {{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}. > When snapshot compression is enabled, rescaling of a source operator leads to > some splits getting lost > -- > > Key: FLINK-34063 > URL: https://issues.apache.org/jira/browse/FLINK-34063 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0 > Environment: Can be reproduced in any environment. The most important > thing is to enable snapshot compression. >Reporter: Ivan Burmistrov >Assignee: David Morávek >Priority: Blocker > Attachments: image-2024-01-11-16-27-09-066.png, > image-2024-01-11-16-30-47-466.png > > > h2. Backstory > We've been experimenting with Autoscaling on the Flink 1.18 and faced a > pretty nasty bug. > The symptoms on our production system were as following. After a while after > deploying a job with autoscaler it started accumulating Kafka lag, and this > could only be observed via external lag measurement - from inside Flink > (measured by > {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: > !image-2024-01-11-16-27-09-066.png|width=887,height=263! > After some digging, it turned out that the job has lost some Kafka partitions > - i.e. it stopped consuming from them, “forgot” about their existence. That’s > why from the Flink’s perspective everything was fine - the lag was growing on > the partitions Flink no longer knew about. > This was visible on a metric called “Assigned partitions” > (KafkaSourceReader_KafkaConsumer_assigned_partitions): > !image-2024-01-11-16-30-47-466.png|width=1046,height=254! > We see on the chart that the job used to know about 20 partitions, and then > this number got dropped to 16. > This drop has been quickly connected to the job’s scaling events. Or, more > precisely, to the scaling of the source operator - with almost 100% > probability any scaling of the source operator led to partitions loss. > h2. Investigation > We've conducted the investigation. We use the latest Kubernetes operator and > deploy jobs with Native Kubernetes. > The reproducing scenario we used for investigation: > * Launch a job with source operator parallelism = 4, enable DEBUG logging > * Wait until it takes the first checkpoint > * Scale-up the source operator to say 5 (no need to wait for autoscaling, it > can be done via Flink UI) > * Wait until the new checkpoint is taken > * Scale-down the source operator to 3 > These simple actions with almost 100% probability led to some partitions get > lost. > After that we've downloaded all the logs and inspected them. Noticed these > strange records in logs: > {code:java} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring > state for 4 split(s) to reader.","service_name":"data-beaver"} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding > split(s) to reader: > [ > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} > We see that some task being restored with 4 splits, however actual splits > have duplicates - we see that in reality 2 unique partitions have been added > ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). > Digging into the code and the logs a bit more, log lines like this started > looking suspicious: > > {code:java} >
[jira] [Commented] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost
[ https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805934#comment-17805934 ] Piotr Nowojski commented on FLINK-34063: Could this issue be discovered by our ITCases if snapshot compression had been enabled in them? Regardless of that, it is probably a good idea to enable it using our configuration randomisation framework: {{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}. > When snapshot compression is enabled, rescaling of a source operator leads to > some splits getting lost > -- > > Key: FLINK-34063 > URL: https://issues.apache.org/jira/browse/FLINK-34063 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0 > Environment: Can be reproduced in any environment. The most important > thing is to enable snapshot compression. >Reporter: Ivan Burmistrov >Assignee: David Morávek >Priority: Blocker > Attachments: image-2024-01-11-16-27-09-066.png, > image-2024-01-11-16-30-47-466.png > > > h2. Backstory > We've been experimenting with Autoscaling on the Flink 1.18 and faced a > pretty nasty bug. > The symptoms on our production system were as following. After a while after > deploying a job with autoscaler it started accumulating Kafka lag, and this > could only be observed via external lag measurement - from inside Flink > (measured by > {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: > !image-2024-01-11-16-27-09-066.png|width=887,height=263! > After some digging, it turned out that the job has lost some Kafka partitions > - i.e. it stopped consuming from them, “forgot” about their existence. That’s > why from the Flink’s perspective everything was fine - the lag was growing on > the partitions Flink no longer knew about. > This was visible on a metric called “Assigned partitions” > (KafkaSourceReader_KafkaConsumer_assigned_partitions): > !image-2024-01-11-16-30-47-466.png|width=1046,height=254! > We see on the chart that the job used to know about 20 partitions, and then > this number got dropped to 16. > This drop has been quickly connected to the job’s scaling events. Or, more > precisely, to the scaling of the source operator - with almost 100% > probability any scaling of the source operator led to partitions loss. > h2. Investigation > We've conducted the investigation. We use the latest Kubernetes operator and > deploy jobs with Native Kubernetes. > The reproducing scenario we used for investigation: > * Launch a job with source operator parallelism = 4, enable DEBUG logging > * Wait until it takes the first checkpoint > * Scale-up the source operator to say 5 (no need to wait for autoscaling, it > can be done via Flink UI) > * Wait until the new checkpoint is taken > * Scale-down the source operator to 3 > These simple actions with almost 100% probability led to some partitions get > lost. > After that we've downloaded all the logs and inspected them. Noticed these > strange records in logs: > {code:java} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring > state for 4 split(s) to reader.","service_name":"data-beaver"} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding > split(s) to reader: > [ > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} > We see that some task being restored with 4 splits, however actual splits > have duplicates - we see that in reality 2 unique partitions have been added > ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). > Digging into the code and the logs a bit more, log lines like this started > looking suspicious: > > {code:java} > {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", > "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state > SubtaskState{operatorStateFromBackend=StateObjectCollection{ > [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244], distributionMode=SPLIT_DISTRIBUTE}}, >
[jira] [Closed] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans
[ https://issues.apache.org/jira/browse/FLINK-33697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33697. -- Release Note: A braking change has been introduced to the `StateBackend` interface. This is relevant only to users that are implementing their own custom state backends. Newly added methods `org.apache.flink.runtime.state.StateBackend#createKeyedStateBackend(KeyedStateBackendParameters parameters)` and `org.apache.flink.runtime.state.StateBackend#createOperatorStateBackend(OperatorStateBackendParameters parameters)` have replaced previous versions of the `createKeyedStateBackend` and `createOperatorStateBackend` methods. The new `parameters` POJO classes contain as fields all of the arguments that were passed directly to those methods. Resolution: Fixed Merged to master as: b660b06cb70^..e6556fa898d > FLIP-386: Support adding custom metrics in Recovery Spans > - > > Key: FLINK-33697 > URL: https://issues.apache.org/jira/browse/FLINK-33697 > Project: Flink > Issue Type: New Feature > Components: Runtime / Metrics, Runtime / State Backends >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > h1. Motivation > [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of > [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]. > The intention here is to add a capability for state backends to attach > custom attributes during recovery to recovery spans. For example > RocksDBIncrementalRestoreOperation could report both remote download time and > time to actually clip/ingest the RocksDB instances after rescaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805739#comment-17805739 ] Piotr Nowojski commented on FLINK-33856: In that case [~hejufang001] it would be great if you started another FLIP for adding per sub-task spans. If you decide to od so, please ping me so on Apache Flink Slack or via a Jira ticket so I don't miss it :) > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Assignee: Jufang He >Priority: Major > Labels: pull-request-available > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33775) Report JobInitialization traces
[ https://issues.apache.org/jira/browse/FLINK-33775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33775. -- Fix Version/s: 1.19.0 Resolution: Fixed merged commit 793a66b into apache:master > Report JobInitialization traces > --- > > Key: FLINK-33775 > URL: https://issues.apache.org/jira/browse/FLINK-33775 > Project: Flink > Issue Type: Sub-task >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33695) FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces
[ https://issues.apache.org/jira/browse/FLINK-33695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-33695. -- Resolution: Fixed > FLIP-384: Introduce TraceReporter and use it to create checkpointing and > recovery traces > > > Key: FLINK-33695 > URL: https://issues.apache.org/jira/browse/FLINK-33695 > Project: Flink > Issue Type: New Feature > Components: Runtime / Checkpointing, Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 1.19.0 > > > https://cwiki.apache.org/confluence/x/TguZE > *Motivation* > Currently Flink has a limited observability of checkpoint and recovery > processes. > For checkpointing Flink has a very detailed overview in the Flink WebUI, > which works great in many use cases, however it’s problematic if one is > operating multiple Flink clusters, or if cluster/JM dies. Additionally there > are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), > however those metrics have a couple of issues: > * They are reported and refreshed periodically, depending on the > MetricReporter settings, which doesn’t take into account checkpointing > frequency. > ** If checkpointing interval > metric reporting interval, we would be > reporting the same values multiple times. > ** If checkpointing interval < metric reporting interval, we would be > randomly dropping metrics for some of the checkpoints. > For recovery we are missing even the most basic of the metrics and Flink > WebUI support. Also given the fact that recovery is even less frequent > compared to checkpoints, adding recovery metrics would have even bigger > problems with unnecessary reporting the same values. > In this FLIP I’m proposing to add support for reporting traces/spans > (example: Traces) and use this mechanism to report checkpointing and recovery > traces. I hope in the future traces will also prove useful in other areas of > Flink like job submission, job state changes, ... . Moreover as the API to > report traces will be added to the MetricGroup , users will be also able to > access this API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI
[ https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803535#comment-17803535 ] Piotr Nowojski commented on FLINK-33897: By real world motivation, I meant if that really is an issue that someone complained about? If not, and this is just a theoretical possibility that comes from your observation when implementing FLINK-6755 "it could be implemented, someone might find it useful", I would put it aside for the time being. Honestly, I doubt many users would use this feature. In most cases just cancelling the job and restarting with new configuration would be faster vs someone first trying to find out in the docs/user mailing list/stack overflow that he can actually trigger unaligned checkpoint from CLI first. This would be only useful to a handful of power users, but those should already know about that it's better to use unaligned checkpoints from the get go. {quote} I'm not very familiar with this part so if you think this is a big change, I won't insist on doing it. {quote} Adding a new BarrierHandlerState maybe is not a very big change per se, but will visible increase complexity of the code when someone needs to read/understand it. {quote} I do agree we could enable timeout for aligned cp by default, which greatly reduce this case {quote} Let me start the dev mailing list discussion about that. > Allow triggering unaligned checkpoint via CLI > - > > Key: FLINK-33897 > URL: https://issues.apache.org/jira/browse/FLINK-33897 > Project: Flink > Issue Type: New Feature > Components: Command Line Client, Runtime / Checkpointing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > > After FLINK-6755, user could trigger checkpoint through CLI. However I > noticed there would be value supporting trigger it in unaligned way, since > the job may encounter a high back-pressure and an aligned checkpoint would > fail. > > I suggest we provide an option '-unaligned' in CLI to support that. > > Similar option would also be useful for REST api -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478 ] Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:58 AM: {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. This data is already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). [~hejufang001] I would suggest that both of yo chat offline about the scope of the changes in [~fanrui]'s FLIP and/or eventual division of work. I'm not sure if [~fanrui] plans to add per task/subtask spans for checkpoints and/or recovery. was (Author: pnowojski): {quote} Maybe a new flip that supports task-level trace reporter can builded ? I’m willing to participate in the development. {quote} Please again check the FLIP-384 discussions. I was highlighting there a couple of difficulties: {quote} However, if we would like to create true distributed traces, with spans reported from many different components, potentially both on JM and TM, the problem is a bit deeper. The issue in that case is how to actually fill out `parrent_id` and `trace_id`? Passing some context entity as a java object would be unfeasible. That would require too many changes in too many places. I think the only realistic way to do it, would be to have a deterministic generator of `parten_id` and `trace_id` values. For example we could create the parent trace/span of the checkpoint on JM, and set those ids to something like: `jobId#attemptId#checkpointId`. Each subtask then could re-generate those ids and subtasks' checkpoint span would have an id of `jobId#attemptId#checkpointId#subTaskId`. Note that this is just an example, as most likely distributed spans for checkpointing do not make sense, as we can generate them much easier on the JM anyway. {quote} https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4 At the same time: {quote} I am worried that a large amount of data aggregation to JM may have performance problems. {quote} I wouldn't worry about that too much. This data is already aggregated on the JM from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. Besides, it's just a single RPC from subtask -> JM per checkpoint. If that becomes a problem, we would have problems in many different areas as well (for example {{notifyCheckpointCompleted}} is a very similar call but the other direction). Also AFAIR there are/were different ideas how to solve this potential bottleneck in a more generic way (having multiple job coordinators in the cluster to spread the load). > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: