[jira] [Updated] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35420:
---
Fix Version/s: 1.20.0

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35420:
--

Assignee: Piotr Nowojski

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35420.
--
Resolution: Fixed

 9ccfb65 into apache:master now

> WordCountMapredITCase fails to compile in IntelliJ
> --
>
> Key: FLINK-35420
> URL: https://issues.apache.org/jira/browse/FLINK-35420
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hadoop Compatibility
>Affects Versions: 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> {noformat}
> flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
> value isFalse is not a member of ?0
> possible cause: maybe a semicolon is missing before `value isFalse'?
>   .isFalse()
> {noformat}
> Might be caused by:
> https://youtrack.jetbrains.com/issue/SCL-20679



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35420) WordCountMapredITCase fails to compile in IntelliJ

2024-05-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35420:
--

 Summary: WordCountMapredITCase fails to compile in IntelliJ
 Key: FLINK-35420
 URL: https://issues.apache.org/jira/browse/FLINK-35420
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hadoop Compatibility
Affects Versions: 1.20.0
Reporter: Piotr Nowojski


{noformat}
flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala:42:8
value isFalse is not a member of ?0
possible cause: maybe a semicolon is missing before `value isFalse'?
  .isFalse()
{noformat}

Might be caused by:
https://youtrack.jetbrains.com/issue/SCL-20679




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-32828.
--
Fix Version/s: 2.0.0
   1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

Merged to master as 7fd6c6da26c
Merged to release-1.19 as a160f8db1f7
Merged to release-1.18 as 4848a50d735

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.18.2, 1.20.0, 1.19.1
>
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 

[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-16 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846869#comment-17846869
 ] 

Piotr Nowojski commented on FLINK-35351:


Thanks, after reading the PR indeed I understand this more :)

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>  Labels: pull-request-available
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
>  ~[classes/:?]
>     ... 10 more {code}
> We discovered that this issue occurs 

[jira] [Updated] (FLINK-32828) Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Summary: Partition aware watermark not handled correctly shortly after job 
start up from checkpoint or savepoint  (was: Kafka partition aware watermark 
not handled correctly shortly after job start up from checkpoint or savepoint)

> Partition aware watermark not handled correctly shortly after job start up 
> from checkpoint or savepoint
> ---
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> 

[jira] [Comment Edited] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846694#comment-17846694
 ] 

Piotr Nowojski edited comment on FLINK-32828 at 5/15/24 4:27 PM:
-

We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

edit: Also the problem doesn't affect only Kafka, but all FLIP-27 sources 
(anything that uses {{SourceOperator}}.


was (Author: pnowojski):
We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single 

[jira] [Commented] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846694#comment-17846694
 ] 

Piotr Nowojski commented on FLINK-32828:


We have just stumbled upon the same issue and I can confirm that it's quite 
critical bug that can leads to all kinds of incorrect results. 

The problem is that the initial splits recovered from state are initial are 
registered in the {{WatermarkOutputMultiplexer}} only when first record from 
that split has been emitted. Resulting watermark was not properly combined from 
the initial splits, but only from the splits that have already emitted at least 
one record.

I will publish a fix for that shortly.

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Priority: Critical  (was: Major)

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Affects Version/s: 1.18.1
   1.19.0

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit 

[jira] [Assigned] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-32828:
--

Assignee: Piotr Nowojski

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when 

[jira] [Commented] (FLINK-33109) Watermark alignment not applied after recovery from checkpoint

2024-05-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846597#comment-17846597
 ] 

Piotr Nowojski commented on FLINK-33109:


Hi!

{quote}
I guess this bug has been fixed in 1.17.2 and 1.18.
{quote}

Has this been confirmed?  [~YordanPavlov], has this issue been fixed for you?

> Watermark alignment not applied after recovery from checkpoint
> --
>
> Key: FLINK-33109
> URL: https://issues.apache.org/jira/browse/FLINK-33109
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.17.1
>Reporter: Yordan Pavlov
>Priority: Major
> Attachments: WatermarkTest-1.scala, 
> image-2023-09-18-15-40-06-868.png, image-2023-09-18-15-46-16-106.png
>
>
> I am observing a problem where after recovery from a checkpoint the Kafka 
> source watermarks would start to diverge not honoring the watermark alignment 
> setting I have applied.
> I have a Kafka source which reads a topic with 32 partitions. I am applying 
> the following watermark strategy:
> {code:java}
> new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => 
> msg.value.getTimestamp)
>       .withWatermarkAlignment("alignment-sources-group", 
> time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code}
>  
> This works great up until my job needs to recover from checkpoint. Once the 
> recovery takes place, no alignment is taking place any more. This can best be 
> illustrated by looking at the watermark metrics for various operators in the 
> image:
> !image-2023-09-18-15-40-06-868.png!
>  
> You can see how the watermarks disperse after the recovery. Trying to debug 
> the problem I noticed that before the failure there would be calls in
>  
> {code:java}
> SourceCoordinator::announceCombinedWatermark() 
> {code}
> after the recovery, no calls get there, so no value for 
> {code:java}
> watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code}
> is ever read. I can manually fix the problem If I stop the job, clear all 
> state from Zookeeper and then manually start Flink providing the last 
> checkpoint with 
> {code:java}
> '–fromSavepoint'{code}
>  flag. This would cause the SourceCoordinator to be constructed properly and 
> watermark drift to be checked. Once recovery manually watermarks would again 
> converge to the allowed drift as seen in the metrics:
> !image-2023-09-18-15-46-16-106.png!
>  
> Let me know If I can be helpful by providing any more information.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846231#comment-17846231
 ] 

Piotr Nowojski edited comment on FLINK-35351 at 5/14/24 9:11 AM:
-

Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

Relevant code pointers:
* 
{{org.apache.flink.runtime.checkpoint.CheckpointOptions.AlignmentType#FORCED_ALIGNED}}
* 
{{org.apache.flink.streaming.runtime.io.RecordWriterOutput#supportsUnalignedCheckpoints}}
* 
{{org.apache.flink.streaming.api.graph.StreamGraphGenerator#shouldDisableUnalignedCheckpointing}}



was (Author: pnowojski):
Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at 

[jira] [Commented] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846231#comment-17846231
 ] 

Piotr Nowojski commented on FLINK-35351:


Hi [~lda-dima], thanks for reporting this issue and what looks like correct 
analysis. However I think the proposed solution goes in the wrong direction - 
we can not disallow rescaling, as that might prevent jobs from recovering in 
case of for example a loss of a TaskManager. Note that the problem probably 
happens due to the use of " custom partitioner" here. AFAIR unaligned 
checkpoints are (for most likely the exactly same reason that you discovered in 
this bug) enabled only for keyed exchanges. [Pointwise/broadcast connections 
with unaligned checkpoints are 
unsupported|https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#certain-data-distribution-patterns-are-not-checkpointed].
 There is somewhere in the codebase code, that checks whether for given 
partitioning unaligned checkpoints should be enabled or not. I would guess that 
custom partitioners are mishandled there?  

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> 

[jira] [Assigned] (FLINK-35351) Restore from unaligned checkpoints with a custom partitioner fails.

2024-05-14 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-35351:
--

Assignee: Dmitriy Linevich

> Restore from unaligned checkpoints with a custom partitioner fails.
> ---
>
> Key: FLINK-35351
> URL: https://issues.apache.org/jira/browse/FLINK-35351
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Dmitriy Linevich
>Assignee: Dmitriy Linevich
>Priority: Major
>
> We encountered a problem when using a custom partitioner with unaligned 
> checkpoints. The bug reproduces under the following steps:
>  # Run a job with graph: Source[2]->Sink[3], the custom partitioner applied 
> after the Source task.
>  # Make a checkpoint.
>  # Restore from the checkpoint with a different source parallelism: 
> Source[1]->Sink[3].
>  # An exception is thrown.
> This issue does not occur when restoring with the same parallelism or when 
> changing the Sink parallelism. The exception only occurs when the parallelism 
> of the Source is changed while the Sink parallelism remains the same.
> See the exception below and the test code at the end. 
> {code:java}
> [db13789c52b80aad852c53a0afa26247] Task [Sink: sink (3/3)#0] WARN  Sink: sink 
> (3/3)#0 
> (be1d158c2e77fc9ed9e3e5d9a8431dc2_0a448493b4782967b150582570326227_2_0) 
> switched from RUNNING to FAILED with failure cause:
> java.io.IOException: Can't get next record for channel 
> InputChannelInfo{gateIdx=0, inputChannelIdx=0}
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:600)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:930)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:879)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753) 
> [classes/:?]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) 
> [classes/:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
> Caused by: java.io.IOException: Corrupt stream, found tag: -1
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:222)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:44)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>  ~[classes/:?]
>     at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer$VirtualChannel.getNextRecord(DemultiplexingRecordDeserializer.java:79)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:154)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.getNextRecord(DemultiplexingRecordDeserializer.java:54)
>  ~[classes/:?]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:103)
>  ~[classes/:?]
>     ... 10 more {code}
> We discovered that this issue occurs due to an optimization in the 
> 

[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-04-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840327#comment-17840327
 ] 

Piotr Nowojski commented on FLINK-33856:


Hey [~hejufang001]! Sorry for the usual open source delay :( The FLIP looks 
good to me. I would only suggest to add a config option to disable/enable those 
child spans.

It would be also nice to have some aggregated values like {{sum}} or {{max}} in 
the parent span, in a similar way how recovery spans have those currently, but 
that might be an independent effort.

I've created a FLIP-448 page for this effort: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-448%3A+Add+sub-task+spans+to+TraceReporter+for+checkpointing
 . Next steps would be to create dev mailing list thread.

nit: I think if you press the button [Publish to 
web|https://images.template.net/wp-content/uploads/2022/10/How-to-Share-Google-Docs-with-Others-Publish-on-Web.jpg],
 the wiki widget will be able to generate a preview.

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33856) Add sub-task spans to TraceReporter for checkpointing

2024-04-24 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-33856:
---
Summary: Add sub-task spans to TraceReporter for checkpointing  (was: Add 
metrics to monitor the interaction performance between task and external 
storage system in the process of checkpoint making)

> Add sub-task spans to TraceReporter for checkpointing
> -
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-04-23 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-34913.
--
Fix Version/s: 1.20.0
   (was: 2.0.0)
   Resolution: Fixed

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-20217) More fine-grained timer processing

2024-04-11 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-20217:
--

Assignee: Piotr Nowojski

> More fine-grained timer processing
> --
>
> Key: FLINK-20217
> URL: https://issues.apache.org/jira/browse/FLINK-20217
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Task
>Affects Versions: 1.10.2, 1.11.2, 1.12.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Timers are currently processed in one big block under the checkpoint lock 
> (under {{InternalTimerServiceImpl#advanceWatermark}}. This can be problematic 
> in a number of scenarios while doing checkpointing which would lead to 
> checkpoints timing out (and even unaligned checkpoints would not help).
> If you have a huge number of timers to process when advancing the watermark 
> and the task is also back-pressured, the situation may actually be worse 
> since you would block on the checkpoint lock and also wait for 
> buffers/credits from the receiver.
> I propose to make this loop more fine-grained so that it is interruptible by 
> checkpoints, but maybe there is also some other way to improve here.
> This issue has been for example observed here: 
> https://lists.apache.org/thread/f6ffk9912fg5j1rfkxbzrh0qmp4w6qry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836113#comment-17836113
 ] 

Piotr Nowojski commented on FLINK-34704:


Sounds good to me (y) Indeed we could later provide some kind of overdraft 
buffer capacity to be used just for checkpointing. I think that this might 
relate to the things I want to propose in FLIP-443 as it will give the AWOP 
some way of knowing that it should use the overdraft buffer. Let's discuss this 
later and keep the ticket open :)

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083
 ] 

Piotr Nowojski edited comment on FLINK-34704 at 4/11/24 9:57 AM:
-

Yes, I've read the referenced dev mailing list thread and I still don't agree 
with the conclusion there, that AWOP should allow for a checkpoint to happen 
while it's {{.yield()}}'ing :)
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 


was (Author: pnowojski):
{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836083#comment-17836083
 ] 

Piotr Nowojski commented on FLINK-34704:


{quote}
the checkpoint still happen when some user code is running.
{quote}
If you are talking about async user code from AWOP, that's not a problem and 
never has been. When I'm talking about problems with state inconsistency of 
upstream chained operators, that's exactly what I mean. Problem is for UPSTREAM 
operator that is CHAINED with AWOP:

network input -> OperatorA -> AWOP -> network output

OperatorA's state can become corrupted/inconsistent if you allow for checkpoint 
to happen while {{AWOP}} invokes {{.yield()}}. That's the whole point of the 
yield to downstream (https://issues.apache.org/jira/browse/FLINK-13063). And 
side note, it's not only a problem of {{AWOP}} but any operator that wants to 
{{.yield()}}. Operators can only yield to downstream, which also means 
checkpoints can not be executed, and it affects also firing timers. 

I have a WIP FLIP-443 that will introduce a neat trick to allow operators to 
return the execution back to {{StreamTask}} so that checkpoint can happen, but 
AFAIU it has no application for the {{AWOP}}. When {{AWOP}} wants to 
{{.yield()}}, it means it has no more space in the buffer to store any more 
records. So we can not checkpoint {{AWOP}} until some buffer space will become 
available. So {{AWOP}} needs to yield to downstream ({{.yield()}}) until buffer 
space becomes available, return from the {{#processElement()}} call, and then 
it needs to relay on a fix for FLINK-35051 for stream task to prioritise 
performing a checkpoint over executing more records/mails. 

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836037#comment-17836037
 ] 

Piotr Nowojski commented on FLINK-34704:


{code:java}
So it is designed to be safe to checkpoint during waiting async results (during 
yield). This operator is the only one that allow for checkpoint in middle of 
waiting.
{code}
Yes and no. It can be checkpointed with in-flight requests, but it can not be 
checkpointed while it is calling {{.yield()}} anywhere in it's own code, due to 
the state corruption/inconsistency issue I described above. So the solution is 
not to let subtask checkpoint while {{AsyncWaitOperator}} is calling 
{{.yield()}}, but {{StreamTask}} should prioritize taking a checkpoint over 
processing {{AWOPs}} mails that are already enqueued. 

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796
 ] 

Piotr Nowojski edited comment on FLINK-34704 at 4/10/24 3:46 PM:
-

Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint 
while operator is yielding, since that can lead to inconsistent state for 
upstream chained operators. For example when upstream operator does this:

{code}
OperatorXYZ::processElement(e, output) {
  stateA = e.foo;
  output.collect(e);
  stateB = e.bar;
}
{code}
Downstream chained operator inside {{output.collect(e)}} call, can not allow 
for checkpoint to be executed, since this would checkpoint the {{OperatorXYZ}} 
in an inconsistent state.

I've discovered this issue independently (but for other operators) and 
described the problem in FLINK-35051. I think a better solution would be to 
make {{StreamTask}} prioritise:
* processing priority messages from network stack over processing mails
* execute out of order "urgent" mails, like time outing aligned checkpoint 
barriers to unaligned ones or triggerCheckpoint RPCs for source tasks

I would propose to close this ticket for just {{AsyncWaitOperator}} in favour 
of FLINK-35051.


was (Author: pnowojski):
Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint 
while operator is yielding, since that can lead to inconsistent state for 
upstream operators. For example when upstream operator does this:

{code}
OperatorXYZ::processElement(e, output) {
  stateA = e.foo;
  output.collect(e);
  stateB = e.bar;
}
{code}
Chained operator inside `output.collect(e)` call, can not allow for checkpoint 
to be executed, since this would checkpoint the {{OperatorXYZ}} in an 
inconsistent state.

I've discovered this issue independently (but for other operators) and 
described the problem in FLINK-35051. I think a better solution would be to 
make {{StreamTask}} prioritise:
* processing priority messages from network stack over processing mails
* execute out of order "urgent" mails, like time outing aligned checkpoint 
barriers to unaligned ones or triggerCheckpoint RPCs for source tasks

I would propose to close this ticket for just {{AsyncWaitOperator}} in favour 
of FLINK-35051.

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34704) Process checkpoint barrier in AsyncWaitOperator when the element queue is full

2024-04-10 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835796#comment-17835796
 ] 

Piotr Nowojski commented on FLINK-34704:


Adding new {{yield}} method wouldn't help, as we can not perform a checkpoint 
while operator is yielding, since that can lead to inconsistent state for 
upstream operators. For example when upstream operator does this:

{code}
OperatorXYZ::processElement(e, output) {
  stateA = e.foo;
  output.collect(e);
  stateB = e.bar;
}
{code}
Chained operator inside `output.collect(e)` call, can not allow for checkpoint 
to be executed, since this would checkpoint the {{OperatorXYZ}} in an 
inconsistent state.

I've discovered this issue independently (but for other operators) and 
described the problem in FLINK-35051. I think a better solution would be to 
make {{StreamTask}} prioritise:
* processing priority messages from network stack over processing mails
* execute out of order "urgent" mails, like time outing aligned checkpoint 
barriers to unaligned ones or triggerCheckpoint RPCs for source tasks

I would propose to close this ticket for just {{AsyncWaitOperator}} in favour 
of FLINK-35051.

> Process checkpoint barrier in AsyncWaitOperator when the element queue is full
> --
>
> Key: FLINK-34704
> URL: https://issues.apache.org/jira/browse/FLINK-34704
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Zakelly Lan
>Priority: Minor
>
> As discussed in 
> https://lists.apache.org/thread/4f7ywn29kdv4302j2rq3fkxc6pc8myr2 . Maybe it 
> is better to provide such a new `yield` that can process mail with low 
> priority in the mailbox executor. More discussion needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35065) Add numFiredTimers and numFiredTimersPerSecond metrics

2024-04-09 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-35065.
--
Resolution: Fixed

Merged to master as 98bd2659f5b

> Add numFiredTimers and numFiredTimersPerSecond metrics
> --
>
> Key: FLINK-35065
> URL: https://issues.apache.org/jira/browse/FLINK-35065
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Currently there is now way of knowing how many timers are being fired by 
> Flink, so it's impossible to distinguish, even using code profiling, if 
> operator is firing only a couple of heavy timers per second using ~100% of 
> the CPU time, vs firing thousands of timer per seconds.
> We could add the following metrics to address this issue:
> * numFiredTimers - total number of fired timers per operator
> * numFiredTimersPerSecond - per second rate of firing timers per operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35065) Add numFiredTimers and numFiredTimersPerSecond metrics

2024-04-09 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35065:
--

 Summary: Add numFiredTimers and numFiredTimersPerSecond metrics
 Key: FLINK-35065
 URL: https://issues.apache.org/jira/browse/FLINK-35065
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics, Runtime / Task
Affects Versions: 1.19.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.20.0


Currently there is now way of knowing how many timers are being fired by Flink, 
so it's impossible to distinguish, even using code profiling, if operator is 
firing only a couple of heavy timers per second using ~100% of the CPU time, vs 
firing thousands of timer per seconds.

We could add the following metrics to address this issue:
* numFiredTimers - total number of fired timers per operator
* numFiredTimersPerSecond - per second rate of firing timers per operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints

2024-04-08 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35051:
---
Affects Version/s: 1.16.3

> Weird priorities when processing unaligned checkpoints
> --
>
> Key: FLINK-35051
> URL: https://issues.apache.org/jira/browse/FLINK-35051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network, Runtime / 
> Task
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Priority: Major
>
> While looking through the code I noticed that `StreamTask` is processing 
> unaligned checkpoints in strange order/priority. The end result is that 
> unaligned checkpoint `Start Delay` /  triggering checkpoints in `StreamTask` 
> can be unnecessary delayed by other mailbox actions in the system, like for 
> example:
> * processing time timers
> * `AsyncWaitOperator` results
> * ... 
> Incoming UC barrier is treated as a priority event by the network stack (it 
> will be polled from the input before anything else). This is what we want, 
> but polling elements from network stack has lower priority then processing 
> enqueued mailbox actions.
> Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
> this mailbox action is also not prioritised in any way, so other mailbox 
> actions could be unnecessarily executed first. 
> On top of that there is a clash of two separate concepts here:
> # Mailbox priority. yieldToDownstream - so in a sense reverse to what we 
> would like to have for triggering checkpoint, but that only kicks in #yield() 
> calls, where it's actually correct, that operator in a middle of execution 
> can not yield to checkpoint - it should only yield to downstream.
> # Control mails in mailbox executor - cancellation is done via that, it 
> bypasses whole mailbox queue.
> # Priority events in the network stack.
> It's unfortunate that 1. vs 3. has a naming clash, as priority name is used 
> in both things, and highest network priority event containing UC barrier, 
> when executed via mailbox has actually the lowest mailbox priority.
> Control mails mechanism is a kind of priority mails executed out of order, 
> but doesn't generalise well for use in checkpointing.
> This whole thing should be re-worked at some point. Ideally what we would 
> like have is that:
> * mail to convert AC barriers to UC
> * polling UC barrier from the network input
> * checkpoint trigger via RPC for source tasks
> should be processed first, with an exception of yieldToDownstream, where 
> current mailbox priorities should be adhered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints

2024-04-08 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35051:
---
Description: 
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` /  triggering checkpoints in `StreamTask` 
can be unnecessary delayed by other mailbox actions in the system, like for 
example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks

should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.

  was:
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks

should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.


> Weird priorities when processing unaligned checkpoints
> --
>
> Key: FLINK-35051
> URL: https://issues.apache.org/jira/browse/FLINK-35051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network, Runtime / 
> Task
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Priority: Major
>
> While looking through the code I noticed that `StreamTask` is processing 
> unaligned checkpoints in strange order/priority. The end result is that 
> unaligned checkpoint `Start Delay` /  triggering 

[jira] [Updated] (FLINK-35051) Weird priorities when processing unaligned checkpoints

2024-04-08 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-35051:
---
Description: 
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks

should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.

  was:
While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks
should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.


> Weird priorities when processing unaligned checkpoints
> --
>
> Key: FLINK-35051
> URL: https://issues.apache.org/jira/browse/FLINK-35051
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Network, Runtime / 
> Task
>Affects Versions: 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Priority: Major
>
> While looking through the code I noticed that `StreamTask` is processing 
> unaligned checkpoints in strange order/priority. The end result is that 
> unaligned checkpoint `Start 

[jira] [Created] (FLINK-35051) Weird priorities when processing unaligned checkpoints

2024-04-08 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-35051:
--

 Summary: Weird priorities when processing unaligned checkpoints
 Key: FLINK-35051
 URL: https://issues.apache.org/jira/browse/FLINK-35051
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Network, Runtime / Task
Affects Versions: 1.18.1, 1.19.0, 1.17.2
Reporter: Piotr Nowojski


While looking through the code I noticed that `StreamTask` is processing 
unaligned checkpoints in strange order/priority. The end result is that 
unaligned checkpoint `Start Delay` time can be increased, and triggering 
checkpoints in `StreamTask` can be unnecessary delayed by other mailbox actions 
in the system, like for example:
* processing time timers
* `AsyncWaitOperator` results
* ... 

Incoming UC barrier is treated as a priority event by the network stack (it 
will be polled from the input before anything else). This is what we want, but 
polling elements from network stack has lower priority then processing enqueued 
mailbox actions.

Secondly, if AC barrier timeout to UC, that's done via a mailbox action, but 
this mailbox action is also not prioritised in any way, so other mailbox 
actions could be unnecessarily executed first. 

On top of that there is a clash of two separate concepts here:
# Mailbox priority. yieldToDownstream - so in a sense reverse to what we would 
like to have for triggering checkpoint, but that only kicks in #yield() calls, 
where it's actually correct, that operator in a middle of execution can not 
yield to checkpoint - it should only yield to downstream.
# Control mails in mailbox executor - cancellation is done via that, it 
bypasses whole mailbox queue.
# Priority events in the network stack.

It's unfortunate that 1. vs 3. has a naming clash, as priority name is used in 
both things, and highest network priority event containing UC barrier, when 
executed via mailbox has actually the lowest mailbox priority.

Control mails mechanism is a kind of priority mails executed out of order, but 
doesn't generalise well for use in checkpointing.

This whole thing should be re-worked at some point. Ideally what we would like 
have is that:
* mail to convert AC barriers to UC
* polling UC barrier from the network input
* checkpoint trigger via RPC for source tasks
should be processed first, with an exception of yieldToDownstream, where 
current mailbox priorities should be adhered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32957) Add current timer trigger lag to metrics

2024-04-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17833184#comment-17833184
 ] 

Piotr Nowojski commented on FLINK-32957:


{{mailboxLatencyMs}} shows basically the same thing AFAIK. That is sampled time 
how long things are waiting in the mailbox queue before being executed, and 
timers are fired via the mailbox.

> Add current timer trigger lag to metrics
> 
>
> Key: FLINK-32957
> URL: https://issues.apache.org/jira/browse/FLINK-32957
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Rui Xia
>Priority: Minor
>
> Timer trigger lag denotes the gap between the actual trigger timestamp and 
> the expected trigger timestamp (registered timestamp to `TimeService`). This 
> metric can aid users to find out whether there is a backlog of timers. 
> The backlog of timers may affect downstream data processing. Users customize 
> the trigger logic, which may interact with downstream data processing. For 
> example, a trigger logic can inject some records to downstream operators. The 
> backlog of timers blocks the record injection. 
> On the other side, The backlog of timers makes jobs unstable. Timers are used 
> by window operators, which leverage a timer to remove the window state of a 
> triggered window. The backlog of timers blocks data removal, and the state 
> size may grow unexpectedly large. The large state size affects the 
> performance of state-backend. In cloud-native environment, a k8s pod is prone 
> to reach local disk limit due to large state files (RocksDB SST).
> Currently, users are hard to observe the backlog of timers. As far as I 
> known, heap dump is the only way to learn the backlog of timers. Thus, users 
> cannot notice the backlog of timers in time. FLINK-32954 
> (https://issues.apache.org/jira/browse/FLINK-32954) exposes number of heap 
> timers, but is not suitable for RocksDB timer due to performance loss.
> Compare with FLINK-32954, timer trigger lag is much more lightweight for 
> RocksDB timer. 
>  * Reason 1: Timer trigger lag does not affect timer registering. 
>  * Reason 2: The effect on timer triggering is limited. Timer registering is 
> a hot code-path, while timer triggering is much colder. In general, the 
> trigger interval is tens of second, and the timer trigger code-path is 
> invoked every tens of second. Thus, the addition of timer trigger lag 
> calculation has little performance overhead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2024-03-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830957#comment-17830957
 ] 

Piotr Nowojski edited comment on FLINK-26515 at 3/26/24 2:31 PM:
-

occurance of the same issue in a private build with the same stack traces as 
https://issues.apache.org/jira/browse/FLINK-26515?focusedCommentId=17760292=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17760292


was (Author: pnowojski):
occurance of the same issue in a private build

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0
>Reporter: Yun Gao
>Priority: Blocker
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--

[jira] [Updated] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2024-03-26 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26515:
---
Priority: Blocker  (was: Critical)

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0
>Reporter: Yun Gao
>Priority: Blocker
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26515) RetryingExecutorTest. testDiscardOnTimeout failed on azure

2024-03-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830957#comment-17830957
 ] 

Piotr Nowojski commented on FLINK-26515:


occurance of the same issue in a private build

> RetryingExecutorTest. testDiscardOnTimeout failed on azure
> --
>
> Key: FLINK-26515
> URL: https://issues.apache.org/jira/browse/FLINK-26515
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.14.3, 1.17.0, 1.16.1, 1.18.0, 1.19.0
>Reporter: Yun Gao
>Priority: Critical
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
>
> {code:java}
> Mar 06 01:20:29 [ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 1.941 s <<< FAILURE! - in 
> org.apache.flink.changelog.fs.RetryingExecutorTest
> Mar 06 01:20:29 [ERROR] testTimeout  Time elapsed: 1.934 s  <<< FAILURE!
> Mar 06 01:20:29 java.lang.AssertionError: expected:<500.0> but 
> was:<1922.869766>
> Mar 06 01:20:29   at org.junit.Assert.fail(Assert.java:89)
> Mar 06 01:20:29   at org.junit.Assert.failNotEquals(Assert.java:835)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:555)
> Mar 06 01:20:29   at org.junit.Assert.assertEquals(Assert.java:685)
> Mar 06 01:20:29   at 
> org.apache.flink.changelog.fs.RetryingExecutorTest.testTimeout(RetryingExecutorTest.java:145)
> Mar 06 01:20:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Mar 06 01:20:29   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Mar 06 01:20:29   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Mar 06 01:20:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Mar 06 01:20:29   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Mar 06 01:20:29   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> Mar 06 01:20:29   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> Mar 06 01:20:29   at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> Mar 06 01:20:29   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
> Mar 06 01:20:29   at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
> Mar 06 01:20:29   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 06 01:20:29   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 06 01:20:29   at 
> java.util.Iterator.forEachRemaining(Iterator.java:116)
> Mar 06 01:20:29   at 
> java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 06 01:20:29   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>  {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=32569=logs=f450c1a5-64b1-5955-e215-49cb1ad5ec88=cc452273-9efa-565d-9db8-ef62a38a0c10=22554



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-25 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34913:
---
Fix Version/s: 2.0.0
   (was: 1.19.1)

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-25 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830419#comment-17830419
 ] 

Piotr Nowojski commented on FLINK-34913:


merged commit 1a5ca7f into apache:master

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-25 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830419#comment-17830419
 ] 

Piotr Nowojski edited comment on FLINK-34913 at 3/25/24 10:01 AM:
--

merged commit 1a5ca7f into apache:master

The change was not backported to 1.19 as the code is disabled in that branch.


was (Author: pnowojski):
merged commit 1a5ca7f into apache:master

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34913:
---
Description: 
The following failures can occur during job's recovery when using clip & 
ingest. This code path is currently not available, so the bug can not happen 
for users.

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}


  was:
The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}



> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.1
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest. This code path is currently not available, so the bug can not happen 
> for users.
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> 

[jira] [Assigned] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-34913:
--

Assignee: Piotr Nowojski

> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.1
>
>
> The following failures can occur during job's recovery when using clip & 
> ingest
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
>   at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
>   at 
> java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}
> {noformat}
> java.util.ConcurrentModificationException
>   at java.base/java.util.HashMap.compute(HashMap.java:1230)
>   at 
> org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>   at java.base/java.lang.Thread.run(Thread.java:829)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34913:
---
Description: 
The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}


  was:
The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}



> ConcurrentModificationException 
> SubTaskInitializationMetricsBuilder.addDurationMetric
> -
>
> Key: FLINK-34913
> URL: https://issues.apache.org/jira/browse/FLINK-34913
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Metrics, Runtime / State Backends
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.1
>
>
> The following failures can occur 

[jira] [Created] (FLINK-34913) ConcurrentModificationException SubTaskInitializationMetricsBuilder.addDurationMetric

2024-03-22 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-34913:
--

 Summary: ConcurrentModificationException 
SubTaskInitializationMetricsBuilder.addDurationMetric
 Key: FLINK-34913
 URL: https://issues.apache.org/jira/browse/FLINK-34913
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics, Runtime / State Backends
Affects Versions: 1.19.0
Reporter: Piotr Nowojski
 Fix For: 1.19.1


The following failures can occur during job's recovery when using clip & ingest

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.runAndReportDuration(RocksDBIncrementalRestoreOperation.java:988)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.lambda$createAsyncCompactionTask$2(RocksDBIncrementalRestoreOperation.java:291)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}

{noformat}
java.util.ConcurrentModificationException
at java.base/java.util.HashMap.compute(HashMap.java:1230)
at 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder.addDurationMetric(SubTaskInitializationMetricsBuilder.java:49)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:794)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$3(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:744)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:704)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34696) GSRecoverableWriterCommitter is generating excessive data blobs

2024-03-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34696:
---
Issue Type: Improvement  (was: Bug)

> GSRecoverableWriterCommitter is generating excessive data blobs
> ---
>
> Key: FLINK-34696
> URL: https://issues.apache.org/jira/browse/FLINK-34696
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Reporter: Simon-Shlomo Poil
>Priority: Major
>
> The `composeBlobs` method in 
> `org.apache.flink.fs.gs.writer.GSRecoverableWriterCommitter` is designed to 
> merge multiple small blobs into a single large blob using Google Cloud 
> Storage's compose method. This process is iterative, combining the result 
> from the previous iteration with 31 new blobs until all blobs are merged. 
> Upon completion of the composition, the method proceeds to remove the 
> temporary blobs.
> *Issue:*
> This methodology results in significant, unnecessary data storage consumption 
> during the blob composition process, incurring considerable costs due to 
> Google Cloud Storage pricing models.
> *Example to Illustrate the Problem:*
>  - Initial state: 64 blobs, each 1 GB in size (totaling 64 GB).
>  - After 1st step: 32 blobs are merged into a single blob, increasing total 
> storage to 96 GB (64 original + 32 GB new).
>  - After 2nd step: The newly created 32 GB blob is merged with 31 more blobs, 
> raising the total to 159 GB.
>  - After 3rd step: The final blob is merged, culminating in a total of 223 GB 
> to combine the original 64 GB of data. This results in an overhead of 159 GB.
> *Impact:*
> This inefficiency has a profound impact, especially at scale, where terabytes 
> of data can incur overheads in the petabyte range, leading to unexpectedly 
> high costs. Additionally, we have observed an increase in storage exceptions 
> thrown by the Google Storage library, potentially linked to this issue.
> *Suggested Solution:*
> To mitigate this problem, we propose modifying the `composeBlobs` method to 
> immediately delete source blobs once they have been successfully combined. 
> This change could significantly reduce data duplication and associated costs. 
> However, the implications for data recovery and integrity need careful 
> consideration to ensure that this optimization does not compromise the 
> ability to recover data in case of a failure during the composition process.
> *Steps to Reproduce:*
> 1. Initiate the blob composition process in an environment with a significant 
> number of blobs (e.g., 64 blobs of 1 GB each).
> 2. Observe the temporary increase in data storage as blobs are iteratively 
> combined.
> 3. Note the final amount of data storage used compared to the initial total 
> size of the blobs.
> *Expected Behavior:*
> The blob composition process should minimize unnecessary data storage use, 
> efficiently managing resources to combine blobs without generating excessive 
> temporary data overhead.
> *Actual Behavior:*
> The current implementation results in significant temporary increases in data 
> storage, leading to high costs and potential system instability due to 
> frequent storage exceptions.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33816.
--
Fix Version/s: 2.0.0
 Assignee: jiabao.sun
   Resolution: Fixed

merged commit 5aebb04 into apache:master 

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 2.0.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33816) SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due async checkpoint triggering not being completed

2024-03-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827503#comment-17827503
 ] 

Piotr Nowojski edited comment on FLINK-33816 at 3/15/24 1:33 PM:
-

Thanks for the fix!

merged commit 5aebb04 into apache:master 


was (Author: pnowojski):
merged commit 5aebb04 into apache:master 

> SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain failed due 
> async checkpoint triggering not being completed 
> -
>
> Key: FLINK-33816
> URL: https://issues.apache.org/jira/browse/FLINK-33816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: jiabao.sun
>Priority: Major
>  Labels: github-actions, pull-request-available, test-stability
> Fix For: 2.0.0
>
> Attachments: screenshot-1.png
>
>
> [https://github.com/XComp/flink/actions/runs/7182604625/job/19559947894#step:12:9430]
> {code:java}
> rror: 14:39:01 14:39:01.930 [ERROR] Tests run: 16, Failures: 1, Errors: 0, 
> Skipped: 0, Time elapsed: 1.878 s <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest
> 9426Error: 14:39:01 14:39:01.930 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain
>   Time elapsed: 0.034 s  <<< FAILURE!
> 9427Dec 12 14:39:01 org.opentest4j.AssertionFailedError: 
> 9428Dec 12 14:39:01 
> 9429Dec 12 14:39:01 Expecting value to be true but was false
> 9430Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
> 9431Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
> 9432Dec 12 14:39:01   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTaskTest.testTriggeringStopWithSavepointWithDrain(SourceStreamTaskTest.java:710)
> 9433Dec 12 14:39:01   at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> 9434Dec 12 14:39:01   at 
> java.base/java.lang.reflect.Method.invoke(Method.java:580)
> [...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed

2024-03-13 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17826779#comment-17826779
 ] 

Piotr Nowojski commented on FLINK-26990:


I've just encountered the same issue in some internal build (thus can not 
provide an url, but the error message is exactly the same)


> BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>  failed
> ---
>
> Key: FLINK-26990
> URL: https://issues.apache.org/jira/browse/FLINK-26990
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.2
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899]
>  failed due to unexpected behavior in 
> {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}:
> {code}
> Apr 01 11:42:06 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>   Time elapsed: 0.11 s  <<< FAILURE!
> Apr 01 11:42:06 java.lang.AssertionError: 
> Apr 01 11:42:06 
> Apr 01 11:42:06 Expected size: 6 but was: 4 in:
> Apr 01 11:42:06 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed

2024-03-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26990:
---
Priority: Major  (was: Minor)

> BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>  failed
> ---
>
> Key: FLINK-26990
> URL: https://issues.apache.org/jira/browse/FLINK-26990
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.2, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: auto-deprioritized-major, test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899]
>  failed due to unexpected behavior in 
> {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}:
> {code}
> Apr 01 11:42:06 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>   Time elapsed: 0.11 s  <<< FAILURE!
> Apr 01 11:42:06 java.lang.AssertionError: 
> Apr 01 11:42:06 
> Apr 01 11:42:06 Expected size: 6 but was: 4 in:
> Apr 01 11:42:06 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26990) BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime failed

2024-03-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26990:
---
Affects Version/s: 1.18.1

> BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>  failed
> ---
>
> Key: FLINK-26990
> URL: https://issues.apache.org/jira/browse/FLINK-26990
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.16.2, 1.18.1
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-major, test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=914=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=25899]
>  failed due to unexpected behavior in 
> {{BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime}}:
> {code}
> Apr 01 11:42:06 [ERROR] 
> org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupWindowAggregateFunctionOperatorTest.testFinishBundleTriggeredByTime
>   Time elapsed: 0.11 s  <<< FAILURE!
> Apr 01 11:42:06 java.lang.AssertionError: 
> Apr 01 11:42:06 
> Apr 01 11:42:06 Expected size: 6 but was: 4 in:
> Apr 01 11:42:06 [Record @ (undef) : 
> +I(c1,0,1969-12-31T23:59:55,1970-01-01T00:00:05),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,0,1970-01-01T00:00,1970-01-01T00:00:10),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,1,1970-01-01T00:00:05,1970-01-01T00:00:15),
> Apr 01 11:42:06 Record @ (undef) : 
> +I(c1,2,1970-01-01T00:00:10,1970-01-01T00:00:20)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29114) TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with result mismatch

2024-03-13 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-29114:
---
Priority: Blocker  (was: Major)

> TableSourceITCase#testTableHintWithLogicalTableScanReuse sometimes fails with 
> result mismatch 
> --
>
> Key: FLINK-29114
> URL: https://issues.apache.org/jira/browse/FLINK-29114
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Tests
>Affects Versions: 1.15.0, 1.19.0, 1.20.0
>Reporter: Sergey Nuyanzin
>Assignee: Jane Chan
>Priority: Blocker
>  Labels: auto-deprioritized-major, pull-request-available, 
> test-stability
> Fix For: 1.20.0
>
> Attachments: FLINK-29114.log, image-2024-02-27-15-23-49-494.png, 
> image-2024-02-27-15-26-07-657.png, image-2024-02-27-15-32-48-317.png
>
>
> It could be reproduced locally by repeating tests. Usually about 100 
> iterations are enough to have several failed tests
> {noformat}
> [ERROR] Tests run: 13, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
> 1.664 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase
> [ERROR] 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse
>   Time elapsed: 0.108 s  <<< FAILURE!
> java.lang.AssertionError: expected: 3,2,Hello world, 3,2,Hello world, 3,2,Hello world)> but was: 2,2,Hello, 2,2,Hello, 3,2,Hello world, 3,2,Hello world)>
>     at org.junit.Assert.fail(Assert.java:89)
>     at org.junit.Assert.failNotEquals(Assert.java:835)
>     at org.junit.Assert.assertEquals(Assert.java:120)
>     at org.junit.Assert.assertEquals(Assert.java:146)
>     at 
> org.apache.flink.table.planner.runtime.batch.sql.TableSourceITCase.testTableHintWithLogicalTableScanReuse(TableSourceITCase.scala:428)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>     at 
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
>     at 
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
>     at 
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>     at 
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>     at 
> 

[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-02-25 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820619#comment-17820619
 ] 

Piotr Nowojski commented on FLINK-33856:


Hey [~hejufang001]! The new processes hasn't been yet documented, but it will 
be basically the same, the only difference being that for initial discussion 
stage FLIPs should be published not on wiki but via Google Docs (with public 
read access, disabled comments). Before/after voting a committer will be copy 
pasting it to wiki. So if you are still willing to work on this, feel free to 
start the discussion thread using Google Document :)

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818838#comment-17818838
 ] 

Piotr Nowojski commented on FLINK-34430:


[~roman] is already working on that under FLINK-26050 ticket. However the akka 
frame size is still an issue. The 10MB limit is dangerously small, especially 
when the {{ByteStreamHandle}} size limit is bumped to 1MB (which is a pretty 
reasonable value all things consider). In that case the limit would exceeded 
with only 10+ files.

> Akka frame size exceeded with many ByteStreamStateHandle being used
> ---
>
> Key: FLINK-34430
> URL: https://issues.apache.org/jira/browse/FLINK-34430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Major
>
> The following error can happen
> {noformat}
> Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
>   
> error.stack_trace
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
> {noformat}
> when https://issues.apache.org/jira/browse/FLINK-26050 is causing large 
> amount of small sst files to be created and never deleted. If those files are 
> small enough to be handled by {{ByteStreamStateHandle}} akka frame size can 
> be exceeded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34386) Add RocksDB bloom filter metrics

2024-02-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818837#comment-17818837
 ] 

Piotr Nowojski edited comment on FLINK-34386 at 2/20/24 2:42 PM:
-

merged commit 890a995 into apache:master now

Thanks [~hejufang001]!


was (Author: pnowojski):
merged commit 890a995 into apache:master now

> Add RocksDB bloom filter metrics
> 
>
> Key: FLINK-34386
> URL: https://issues.apache.org/jira/browse/FLINK-34386
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In our production environment, with RocksDB bloom filter enabled, the 
> performance optimization effect on task state reading is obvious. However, 
> there is a lack of usage metrics for bloom filter, If these Metrics are 
> reported via Metrics reporter, it is easy to monitor the effectiveness of 
> bloom filter optimization.
> And these metrics are available from RocksDB Statistics:
> BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads.
> BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads.
> BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the 
> reads and data actually exist.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34386) Add RocksDB bloom filter metrics

2024-02-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-34386.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

merged commit 890a995 into apache:master now

> Add RocksDB bloom filter metrics
> 
>
> Key: FLINK-34386
> URL: https://issues.apache.org/jira/browse/FLINK-34386
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> In our production environment, with RocksDB bloom filter enabled, the 
> performance optimization effect on task state reading is obvious. However, 
> there is a lack of usage metrics for bloom filter, If these Metrics are 
> reported via Metrics reporter, it is easy to monitor the effectiveness of 
> bloom filter optimization.
> And these metrics are available from RocksDB Statistics:
> BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads.
> BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads.
> BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the 
> reads and data actually exist.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34386) Add RocksDB bloom filter metrics

2024-02-20 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-34386:
--

Assignee: Jufang He

> Add RocksDB bloom filter metrics
> 
>
> Key: FLINK-34386
> URL: https://issues.apache.org/jira/browse/FLINK-34386
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> In our production environment, with RocksDB bloom filter enabled, the 
> performance optimization effect on task state reading is obvious. However, 
> there is a lack of usage metrics for bloom filter, If these Metrics are 
> reported via Metrics reporter, it is easy to monitor the effectiveness of 
> bloom filter optimization.
> And these metrics are available from RocksDB Statistics:
> BLOOM_FILTER_USEFUL: times bloom filter has avoided file reads.
> BLOOM_FILTER_FULL_POSITIVE: times bloom FullFilter has not avoided the reads.
> BLOOM_FILTER_FULL_TRUE_POSITIVE: times bloom FullFilter has not avoided the 
> reads and data actually exist.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out

2024-02-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817251#comment-17817251
 ] 

Piotr Nowojski edited comment on FLINK-34424 at 2/14/24 8:39 AM:
-

:D

[~mapohl] I haven't made any modifications to this area of code since a very 
long time and I unfortunately recently don't have much time to investigate this 
kind of issues. I would suggest to ping someone else. Maybe authors of the most 
recent change to this test file: FLINK-33743

[~yunfengzhou][~tanyuxin]



was (Author: pnowojski):
:D

[~mapohl] I haven't made any modifications to this area of code since a very 
long time and I unfortunately recently don't have much time to investigate this 
kind of issues. I would suggest to ping someone else. Maybe authors of the most 
recent change to this test file: FLINK-33743


> BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times 
> out
> 
>
> Key: FLINK-34424
> URL: https://issues.apache.org/jira/browse/FLINK-34424
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151
> {code}
> Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 
> tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000]
> Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor)
> Feb 11 13:55:29   at java.lang.Object.wait(Native Method)
> Feb 11 13:55:29   at java.lang.Thread.join(Thread.java:1252)
> Feb 11 13:55:29   - locked <0xe2e019a8> (a 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81)
> Feb 11 13:55:29   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177)
> Feb 11 13:55:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34424) BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times out

2024-02-14 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817251#comment-17817251
 ] 

Piotr Nowojski commented on FLINK-34424:


:D

[~mapohl] I haven't made any modifications to this area of code since a very 
long time and I unfortunately recently don't have much time to investigate this 
kind of issues. I would suggest to ping someone else. Maybe authors of the most 
recent change to this test file: FLINK-33743


> BoundedBlockingSubpartitionWriteReadTest#testRead10ConsumersConcurrent times 
> out
> 
>
> Key: FLINK-34424
> URL: https://issues.apache.org/jira/browse/FLINK-34424
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57446=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9151
> {code}
> Feb 11 13:55:29 "ForkJoinPool-50-worker-25" #414 daemon prio=5 os_prio=0 
> tid=0x7f19503af800 nid=0x284c in Object.wait() [0x7f191b6db000]
> Feb 11 13:55:29java.lang.Thread.State: WAITING (on object monitor)
> Feb 11 13:55:29   at java.lang.Object.wait(Native Method)
> Feb 11 13:55:29   at java.lang.Thread.join(Thread.java:1252)
> Feb 11 13:55:29   - locked <0xe2e019a8> (a 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest$LongReader)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.trySync(CheckedThread.java:104)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:92)
> Feb 11 13:55:29   at 
> org.apache.flink.core.testutils.CheckedThread.sync(CheckedThread.java:81)
> Feb 11 13:55:29   at 
> org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionWriteReadTest.testRead10ConsumersConcurrent(BoundedBlockingSubpartitionWriteReadTest.java:177)
> Feb 11 13:55:29   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34430:
---
Description: 
The following error can happen
{noformat}
Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.

error.stack_trace
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
{noformat}
when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount 
of small sst files to be created and never deleted. If those files are small 
enough to be handled by {{ByteStreamStateHandle}} akka frame size can be 
exceeded.


  was:
The following error can happen
{noformat}
Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.

error.stack_trace
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
{noformat}
when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount 
of small sst files to be created and never deleted. If those files are small 
enough to be 



> Akka frame size exceeded with many ByteStreamStateHandle being used
> ---
>
> Key: FLINK-34430
> URL: https://issues.apache.org/jira/browse/FLINK-34430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Major
>
> The following error can happen
> {noformat}
> Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
>   
> error.stack_trace
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
> {noformat}
> when https://issues.apache.org/jira/browse/FLINK-26050 is causing large 
> amount of small sst files to be created and never deleted. If those files are 
> small enough to be handled by {{ByteStreamStateHandle}} akka frame size can 
> be exceeded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34430:
---
Description: 
The following error can happen
{noformat}
Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.

error.stack_trace
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
{noformat}
when https://issues.apache.org/jira/browse/FLINK-26050 is causing large amount 
of small sst files to be created and never deleted. If those files are small 
enough to be 


  was:
The following error can happen
{noformat}
Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.

error.stack_trace
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
{noformat}



> Akka frame size exceeded with many ByteStreamStateHandle being used
> ---
>
> Key: FLINK-34430
> URL: https://issues.apache.org/jira/browse/FLINK-34430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Major
>
> The following error can happen
> {noformat}
> Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
>   
> error.stack_trace
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
> {noformat}
> when https://issues.apache.org/jira/browse/FLINK-26050 is causing large 
> amount of small sst files to be created and never deleted. If those files are 
> small enough to be 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34430:
---
Description: 
The following error can happen
{noformat}
Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.

error.stack_trace
akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
10485760 bytes, actual size of encoded class 
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
{noformat}


> Akka frame size exceeded with many ByteStreamStateHandle being used
> ---
>
> Key: FLINK-34430
> URL: https://issues.apache.org/jira/browse/FLINK-34430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Major
>
> The following error can happen
> {noformat}
> Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
>   
> error.stack_trace
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@/user/rpc/taskmanager_0#-]: max allowed size 
> 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 11212046 bytes.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816597#comment-17816597
 ] 

Piotr Nowojski commented on FLINK-34430:


Akka frame size can be exceeded as a result of RocksDB creating too many small 
files.

> Akka frame size exceeded with many ByteStreamStateHandle being used
> ---
>
> Key: FLINK-34430
> URL: https://issues.apache.org/jira/browse/FLINK-34430
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.16.3, 1.17.2, 1.19.0, 1.18.1
>Reporter: Piotr Nowojski
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34430) Akka frame size exceeded with many ByteStreamStateHandle being used

2024-02-12 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-34430:
--

 Summary: Akka frame size exceeded with many ByteStreamStateHandle 
being used
 Key: FLINK-34430
 URL: https://issues.apache.org/jira/browse/FLINK-34430
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.1, 1.17.2, 1.16.3, 1.19.0
Reporter: Piotr Nowojski






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-02-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816595#comment-17816595
 ] 

Piotr Nowojski commented on FLINK-26050:


I have encountered the same situation but with event time windows and the 
problem is AFAIK not only with state of the timers, but also state of the 
windows. In the case I've seen we had checkpointing configured once every 
minute with windows with length of 10 minutes and there were hundreds of files 
created both every minute and then small fraction of files created every ten 
minutes.

On top of that this can cause also other problems, not only too many opened 
files. In our case it was Akka frame limit exceeded, as each of those files 
small enough to fit in the byte state handle, and akka crashed trying to send 
~3000 of those handles from JM to TM.

> Too many small sst files in rocksdb state backend when using time window 
> created in ascending order
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Priority: Major
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing or event time windows, in some workloads, there will be 
> a lot of small sst files(serveral KB) in rocksdb local directory and may 
> cause "Too many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> 

[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26050:
---
Description: 
When using processing or event time windows, in some workloads, there will be a 
lot of small sst files(serveral KB) in rocksdb local directory and may cause 
"Too many files error".

Use rocksdb tool ldb to find out content in sst files:
 * column family of these small sst files is "processing_window-timers".
 * most sst files are in level-1.
 * records in sst files are almost kTypeDeletion.
 * creation time of sst file correspond to checkpoint interval.

These small sst files seem to be generated when flink checkpoint is triggered. 
Although all content in sst are delete tags, they are not compacted and deleted 
in rocksdb compaction because of not intersecting with each other(rocksdb 
[compaction trivial 
move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
there seems to be no chance to delete them because of small size and not 
intersect with other sst files.

 

I will attach a simple program to reproduce the problem.

 

Since timer in processing time window is generated in strictly ascending 
order(both put and delete). So If workload of job happen to generate level-0 
sst files not intersect with each other.(for example: processing window size 
much smaller than checkpoint interval, and no window content cross checkpoint 
interval or no new data in window crossing checkpoint interval). There will be 
many small sst files generated until job restored from savepoint, or 
incremental checkpoint is disabled. 

 

May be similar problem exists when user use timer in operators with same 
workload.

 

Code to reproduce the problem:
{code:java}
package org.apache.flink.jira;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.List;
import java.util.Random;

@Slf4j
public class StreamApp  {
  public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.ADDRESS, "127.0.0.1");
config.set(RestOptions.PORT, 10086);
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
new 
StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
config));
  }

  public void configureApp(StreamExecutionEnvironment env) throws Exception {

env.enableCheckpointing(2); // 20sec

RocksDBStateBackend rocksDBStateBackend =
new 
RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
 true); // need to be reconfigured

rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
 // need to be reconfigured

env.setStateBackend(rocksDBStateBackend);
env.getCheckpointConfig().setCheckpointTimeout(10);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.getConfig().setTaskCancellationInterval(1);

for (int i = 0; i < 1; ++i) {
  createOnePipeline(env);
}

env.execute("StreamApp");
  }


  private void createOnePipeline(StreamExecutionEnvironment env) {
// data source is configured so that little window cross checkpoint interval
DataStreamSource stream = env.addSource(new Generator(1, 3000, 
3600));

stream.keyBy(x -> x)
// make sure window size less than checkpoint interval. though 100ms is 
too small, I think increase this value can still reproduce the problem with 
longer time.
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction() {
  @Override
  public void process(String s, ProcessWindowFunction.Context context,
  Iterable elements, Collector out) {
for (String ele: elements) {
  out.collect(ele);
}
  }
}).print();
  }

  public static final class Generator 

[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26050:
---
Summary: Too many small sst files in rocksdb state backend when using time 
window created in ascending order  (was: Too many small sst files in rocksdb 
state backend when using processing time window)

> Too many small sst files in rocksdb state backend when using time window 
> created in ascending order
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Priority: Major
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing time window, in some workload, there will be a lot of 
> small sst files(serveral KB) in rocksdb local directory and may cause "Too 
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
> 
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(10);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 

[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-08 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815766#comment-17815766
 ] 

Piotr Nowojski commented on FLINK-34289:


That definitely would prevent my confusion :) 

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png, 
> screenshot-2.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-08 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815685#comment-17815685
 ] 

Piotr Nowojski commented on FLINK-34403:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57399=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Priority: Major
>  Labels: test-stability
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37)
> Feb 07 05:43:21   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalArgumentException: Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323)
> Feb 07 05:43:21   ... 18 more
> Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: 
> Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.lang.Throwable.addSuppressed(Throwable.java:1072)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:486)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:182)
> Feb 

[jira] [Updated] (FLINK-34403) VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM

2024-02-08 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34403:
---
Priority: Critical  (was: Major)

> VeryBigPbProtoToRowTest#testSimple cannot pass due to OOM
> -
>
> Key: FLINK-34403
> URL: https://issues.apache.org/jira/browse/FLINK-34403
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.20.0
>Reporter: Benchao Li
>Priority: Critical
>  Labels: test-stability
>
> After FLINK-33611 merged, the misc test on GHA cannot pass due to out of 
> memory error, throwing following exceptions:
> {code:java}
> Error: 05:43:21 05:43:21.768 [ERROR] Tests run: 1, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 40.98 s <<< FAILURE! -- in 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest
> Error: 05:43:21 05:43:21.773 [ERROR] 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple -- Time 
> elapsed: 40.97 s <<< ERROR!
> Feb 07 05:43:21 org.apache.flink.util.FlinkRuntimeException: Error in 
> serialization.
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:327)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:162)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1007)
> Feb 07 05:43:21   at 
> org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
> Feb 07 05:43:21   at 
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:45)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
> Feb 07 05:43:21   at 
> org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:81)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1495)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1382)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1367)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.validateRow(ProtobufTestHelper.java:66)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:89)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:76)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.ProtobufTestHelper.rowToPbBytes(ProtobufTestHelper.java:71)
> Feb 07 05:43:21   at 
> org.apache.flink.formats.protobuf.VeryBigPbRowToProtoTest.testSimple(VeryBigPbRowToProtoTest.java:37)
> Feb 07 05:43:21   at java.lang.reflect.Method.invoke(Method.java:498)
> Feb 07 05:43:21 Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalArgumentException: Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:323)
> Feb 07 05:43:21   ... 18 more
> Feb 07 05:43:21 Caused by: java.lang.IllegalArgumentException: 
> Self-suppression not permitted
> Feb 07 05:43:21   at 
> java.lang.Throwable.addSuppressed(Throwable.java:1072)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:556)
> Feb 07 05:43:21   at 
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:486)
> Feb 07 05:43:21   at 
> org.apache.flink.streaming.api.graph.StreamConfig.lambda$triggerSerializationAndReturnFuture$0(StreamConfig.java:182)
> Feb 07 05:43:21   at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> Feb 07 05:43:21   at 
> 

[jira] [Comment Edited] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-07 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815532#comment-17815532
 ] 

Piotr Nowojski edited comment on FLINK-34289 at 2/8/24 6:33 AM:


 !Screenshot 2024-02-08 at 07.22.19.png|width=600! 

This part was confusing to me. My thought process was:
# Ok first box says, I should create a *new* ticket.
# But following the arrow, I should *remove* "Instructions". That's slightly 
suggested to rename the ticket.
# The truly confusing part. The box on the right, shows an arrow from "X 
Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue 
first, supporting "create new ticket" interpretation. But I couldn't create 
sub-task. So maybe that arrow is supposed to represent action of renaming thet 
ticket?
# I've checked that in FLINK-34285 someone else has already renamed ticket by 
removing "Instructions" so that further supported my interpretation that I 
should rename it as well.

Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} 
and labeling the arrow between "Instructions" and non "Instructions" tickets, 
to state that for example it represents a linked ticket would help me avoid 
this mistake.


was (Author: pnowojski):
 !Screenshot 2024-02-08 at 07.22.19.png! 

This part was confusing to me. My thought process was:
# Ok first box says, I should create a *new* ticket.
# But following the arrow, I should *remove* "Instructions". That's slightly 
suggested to rename the ticket.
# The truly confusing part. The box on the right, shows an arrow from "X 
Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue 
first, supporting "create new ticket" interpretation. But I couldn't create 
sub-task. So maybe that arrow is supposed to represent action of renaming thet 
ticket?
# I've checked that in FLINK-34285 someone else has already renamed ticket by 
removing "Instructions" so that further supported my interpretation that I 
should rename it as well.

Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} 
and labeling the arrow between "Instructions" and non "Instructions" tickets, 
to state that for example it represents a linked ticket would help me avoid 
this mistake.

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-07 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815532#comment-17815532
 ] 

Piotr Nowojski commented on FLINK-34289:


 !Screenshot 2024-02-08 at 07.22.19.png! 

This part was confusing to me. My thought process was:
# Ok first box says, I should create a *new* ticket.
# But following the arrow, I should *remove* "Instructions". That's slightly 
suggested to rename the ticket.
# The truly confusing part. The box on the right, shows an arrow from "X 
Instructions: Verify" to "X: Verify". I've interpreted it as a subtask issue 
first, supporting "create new ticket" interpretation. But I couldn't create 
sub-task. So maybe that arrow is supposed to represent action of renaming thet 
ticket?
# I've checked that in FLINK-34285 someone else has already renamed ticket by 
removing "Instructions" so that further supported my interpretation that I 
should rename it as well.

Rephrasing the instruction to clarify the meaning of {{remove "Instructions"}} 
and labeling the arrow between "Instructions" and non "Instructions" tickets, 
to state that for example it represents a linked ticket would help me avoid 
this mistake.

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-07 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34289:
---
Attachment: Screenshot 2024-02-08 at 07.22.19.png

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: Screenshot 2024-02-08 at 07.22.19.png, screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814807#comment-17814807
 ] 

Piotr Nowojski commented on FLINK-34289:


I wasn't complaining, just explaining myself :) Sorry for misunderstanding what 
I should have done!

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-06 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814787#comment-17814787
 ] 

Piotr Nowojski commented on FLINK-34289:


Thanks for cleaning this up. Indeed I was confused what to do and decided to 
follow an example from someone else.

Anyway, after giving it a second thought, I think there is no need to cross 
team test this feature. I've closed FLINK-34387.

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34387) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-06 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-34387.
--
Resolution: Won't Do

After giving it a second thought I'm closing this ticket, as there is no big 
need to manually cross test this feature.

> Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to 
> create checkpointing and recovery traces 
> ---
>
> Key: FLINK-34387
> URL: https://issues.apache.org/jira/browse/FLINK-34387
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34289) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-34289:
--

Assignee: (was: Piotr Nowojski)

> Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to 
> create checkpointing and recovery traces 
> ---
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34290) Release Testing Instructions: Verify FLINK-33696 Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2024-02-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-34290.
--
Resolution: Won't Do

Covered by https://issues.apache.org/jira/browse/FLINK-34289

> Release Testing Instructions: Verify FLINK-33696 Add 
> OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
> ---
>
> Key: FLINK-34290
> URL: https://issues.apache.org/jira/browse/FLINK-34290
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34289) Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34289:
---
Summary: Release Testing: Verify FLINK-33695 Introduce TraceReporter and 
use it to create checkpointing and recovery traces   (was: Release Testing 
Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create 
checkpointing and recovery traces )

> Release Testing: Verify FLINK-33695 Introduce TraceReporter and use it to 
> create checkpointing and recovery traces 
> ---
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34289:
---
Description: 
This ticket covers testing three related features: FLINK-33695, FLINK-33735 and 
FLINK-33696.

Instructions:
#  Configure Flink to use 
[Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
 and with enabled *INFO* level logging (can be to console or to a file, doesn't 
matter).
# Start a streaming job with enabled checkpointing.
# Let it run for a couple of checkpoints.
# Verify presence of a single *JobInitialization* [1] trace logged just after 
job start up.
# Verify presence of a couple of *Checkpoint* [1] traces logged after each 
successful or failed checkpoint.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> This ticket covers testing three related features: FLINK-33695, FLINK-33735 
> and FLINK-33696.
> Instructions:
> #  Configure Flink to use 
> [Slf4jTraceReporter|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/trace_reporters/#slf4j]
>  and with enabled *INFO* level logging (can be to console or to a file, 
> doesn't matter).
> # Start a streaming job with enabled checkpointing.
> # Let it run for a couple of checkpoints.
> # Verify presence of a single *JobInitialization* [1] trace logged just after 
> job start up.
> # Verify presence of a couple of *Checkpoint* [1] traces logged after each 
> successful or failed checkpoint.
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/traces/#checkpointing-and-initialization



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34289) Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-02-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814439#comment-17814439
 ] 

Piotr Nowojski commented on FLINK-34289:


Thanks! I will create a single ticket for release testing instructions to cover 
all 3 FLIPs.

> Release Testing Instructions: Verify FLINK-33695 Introduce TraceReporter and 
> use it to create checkpointing and recovery traces 
> 
>
> Key: FLINK-34289
> URL: https://issues.apache.org/jira/browse/FLINK-34289
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-02-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813740#comment-17813740
 ] 

Piotr Nowojski commented on FLINK-33819:


Thanks [~mayuehappy].

{quote}
merged 4f7725aa into master which is not blocked by the performance result.

I also think it's worthy discussing and we could find a better default 
behavious in the next version.
{quote}

+1


Yes

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-02-02 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813740#comment-17813740
 ] 

Piotr Nowojski edited comment on FLINK-33819 at 2/2/24 3:52 PM:


Thanks [~mayuehappy].

{quote}
merged 4f7725aa into master which is not blocked by the performance result.

I also think it's worthy discussing and we could find a better default 
behavious in the next version.
{quote}

+1


was (Author: pnowojski):
Thanks [~mayuehappy].

{quote}
merged 4f7725aa into master which is not blocked by the performance result.

I also think it's worthy discussing and we could find a better default 
behavious in the next version.
{quote}

+1


Yes

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33696) FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2024-02-02 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reopened FLINK-33696:


No it's not yet done. I still have to open a PR adding OpenTelemetry reporters.

7db2ecad was adding only slf4j reporter.

> FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter
> 
>
> Key: FLINK-33696
> URL: https://issues.apache.org/jira/browse/FLINK-33696
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  is adding TraceReporter interface. However with 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces]
>  alone, Log4jTraceReporter would be the only available implementation of 
> TraceReporter interface, which is not very helpful.
> In this FLIP I’m proposing to contribute both MetricExporter and 
> TraceReporter implementation using OpenTelemetry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34134) Add tracing for restored state size and locations

2024-01-18 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-34134:
---
Fix Version/s: 1.19.0

> Add tracing for restored state size and locations
> -
>
> Key: FLINK-34134
> URL: https://issues.apache.org/jira/browse/FLINK-34134
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> We can add tracing during the restore that reports the state size that was 
> restored by location(s). This is particularly interesting for a mixed 
> recovery with some local and some remote state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-27756.
--
Resolution: Fixed

Thanks [~chalixar] for fixing this!

merged commit d92ab39 into apache:master
2b6c656ff31 into release-1.18
ca62b0070cf into release-1.17

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27756:
---
Fix Version/s: 1.17.3
   1.18.2

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.19.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-16 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27756:
---
Affects Version/s: 1.18.1

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27756) Fix intermittently failing test in AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds

2024-01-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-27756:
---
Priority: Blocker  (was: Critical)

> Fix intermittently failing test in 
> AsyncSinkWriterTest.checkLoggedSendTimesAreWithinBounds
> --
>
> Key: FLINK-27756
> URL: https://issues.apache.org/jira/browse/FLINK-27756
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.0, 1.19.0
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.19.0
>
>
> h2. Motivation
>  - One of the integration tests ({{checkLoggedSendTimesAreWithinBounds}}) of 
> {{AsyncSinkWriterTest}} has been reported to fail intermittently on build 
> pipeline causing blocking of new changes.
>  - Reporting build is [linked 
> |https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=36009=logs=aa18c3f6-13b8-5f58-86bb-c1cffb239496=502fb6c0-30a2-5e49-c5c2-a00fa3acb203]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33338) Bump FRocksDB version

2024-01-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806860#comment-17806860
 ] 

Piotr Nowojski commented on FLINK-8:


[~roman], let's make sure that this latest bug fix is also included in our 
FRocksDB release:
https://issues.apache.org/jira/browse/FLINK-7?focusedCommentId=17805364=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17805364

> Bump FRocksDB version
> -
>
> Key: FLINK-8
> URL: https://issues.apache.org/jira/browse/FLINK-8
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Roman Khachatryan
>Priority: Major
>
> We need to bump RocksDB in order to be able to use new IngestDB and ClipDB 
> commands.
> If some of the required changes haven't been merged to Facebook/RocksDB, we 
> should cherry-pick and include them in our FRocksDB fork.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33337) Expose IngestDB and ClipDB in the official RocksDB API

2024-01-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806861#comment-17806861
 ] 

Piotr Nowojski commented on FLINK-7:


Thanks [~mayuehappy] for the update!

> Expose IngestDB and ClipDB in the official RocksDB API
> --
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Yue Ma
>Priority: Major
> Attachments: image-2024-01-11-12-03-14-308.png
>
>
> Remaining open PRs:
> None :)
> Already merged PRs:
> https://github.com/facebook/rocksdb/pull/11646
> https://github.com/facebook/rocksdb/pull/11868
> https://github.com/facebook/rocksdb/pull/11811
> https://github.com/facebook/rocksdb/pull/11381
> https://github.com/facebook/rocksdb/pull/11379
> https://github.com/facebook/rocksdb/pull/11378



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804
 ] 

Piotr Nowojski edited comment on FLINK-33819 at 1/15/24 1:09 PM:
-

Thanks for drafting the proposal [~mayuehappy]

+1 for making this option configurable. 

It would be great if someone could test out [~masteryhx]'s suggestion. 
Configuring no compression for L0 and L1 levels, while keeping lower levels 
compressed:
* By how much the checkpoint  would grow?
* What about performance?

It sounds like this actually could be the default behaviour for us? Would be 
great to test it out, but not a blocker from my side for implementing this 
ticket.


was (Author: pnowojski):
Thanks for drafting the proposal [~mayuehappy]

+1 for making this option configurable. 

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804
 ] 

Piotr Nowojski commented on FLINK-33819:


+1 for making this option configurable. 

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33819) Support setting CompressType in RocksDBStateBackend

2024-01-15 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806804#comment-17806804
 ] 

Piotr Nowojski edited comment on FLINK-33819 at 1/15/24 1:00 PM:
-

Thanks for drafting the proposal [~mayuehappy]

+1 for making this option configurable. 


was (Author: pnowojski):
+1 for making this option configurable. 

> Support setting CompressType in RocksDBStateBackend
> ---
>
> Key: FLINK-33819
> URL: https://issues.apache.org/jira/browse/FLINK-33819
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Yue Ma
>Assignee: Yue Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
> Attachments: image-2023-12-14-11-32-32-968.png, 
> image-2023-12-14-11-35-22-306.png
>
>
> Currently, RocksDBStateBackend does not support setting the compression 
> level, and Snappy is used for compression by default. But we have some 
> scenarios where compression will use a lot of CPU resources. Turning off 
> compression can significantly reduce CPU overhead. So we may need to support 
> a parameter for users to set the CompressType of Rocksdb.
>   !image-2023-12-14-11-35-22-306.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost

2024-01-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805934#comment-17805934
 ] 

Piotr Nowojski edited comment on FLINK-34063 at 1/12/24 8:32 AM:
-

[~dmvk] could this issue be discovered by our ITCases if snapshot compression 
had been enabled in them? Regardless of that, it is probably a good idea to 
enable it using our configuration randomisation framework: 
{{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}.


was (Author: pnowojski):
Could this issue be discovered by our ITCases if snapshot compression had been 
enabled in them? Regardless of that, it is probably a good idea to enable it 
using our configuration randomisation framework: 
{{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}.

> When snapshot compression is enabled, rescaling of a source operator leads to 
> some splits getting lost
> --
>
> Key: FLINK-34063
> URL: https://issues.apache.org/jira/browse/FLINK-34063
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
> Environment: Can be reproduced in any environment. The most important 
> thing is to enable snapshot compression.
>Reporter: Ivan Burmistrov
>Assignee: David Morávek
>Priority: Blocker
> Attachments: image-2024-01-11-16-27-09-066.png, 
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a 
> pretty nasty bug. 
> The symptoms on our production system were as following. After a while after 
> deploying a job with autoscaler it started accumulating Kafka lag, and this 
> could only be observed via external lag measurement - from inside Flink 
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions 
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s 
> why from the Flink’s perspective everything was fine - the lag was growing on 
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions” 
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then 
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more 
> precisely, to the scaling of the source operator - with almost 100% 
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and 
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
>  * Launch a job with source operator parallelism = 4, enable DEBUG logging
>  * Wait until it takes the first checkpoint
>  * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
> can be done via Flink UI)
>  * Wait until the new checkpoint is taken
>  * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get 
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these 
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
>  state for 4 split(s) to reader.","service_name":"data-beaver"} 
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
>  split(s) to reader: 
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits 
> have duplicates - we see that in reality 2 unique partitions have been added 
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started 
> looking suspicious:
>  
> {code:java}
> 

[jira] [Commented] (FLINK-34063) When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost

2024-01-12 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805934#comment-17805934
 ] 

Piotr Nowojski commented on FLINK-34063:


Could this issue be discovered by our ITCases if snapshot compression had been 
enabled in them? Regardless of that, it is probably a good idea to enable it 
using our configuration randomisation framework: 
{{org.apache.flink.streaming.util.TestStreamEnvironment#randomizeConfiguration}}.

> When snapshot compression is enabled, rescaling of a source operator leads to 
> some splits getting lost
> --
>
> Key: FLINK-34063
> URL: https://issues.apache.org/jira/browse/FLINK-34063
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0
> Environment: Can be reproduced in any environment. The most important 
> thing is to enable snapshot compression.
>Reporter: Ivan Burmistrov
>Assignee: David Morávek
>Priority: Blocker
> Attachments: image-2024-01-11-16-27-09-066.png, 
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a 
> pretty nasty bug. 
> The symptoms on our production system were as following. After a while after 
> deploying a job with autoscaler it started accumulating Kafka lag, and this 
> could only be observed via external lag measurement - from inside Flink 
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions 
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s 
> why from the Flink’s perspective everything was fine - the lag was growing on 
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions” 
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then 
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more 
> precisely, to the scaling of the source operator - with almost 100% 
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and 
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
>  * Launch a job with source operator parallelism = 4, enable DEBUG logging
>  * Wait until it takes the first checkpoint
>  * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
> can be done via Flink UI)
>  * Wait until the new checkpoint is taken
>  * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get 
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these 
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
>  state for 4 split(s) to reader.","service_name":"data-beaver"} 
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
>  split(s) to reader: 
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits 
> have duplicates - we see that in reality 2 unique partitions have been added 
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started 
> looking suspicious:
>  
> {code:java}
> {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
>  "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
> SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
> [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], distributionMode=SPLIT_DISTRIBUTE}}, 
> 

[jira] [Closed] (FLINK-33697) FLIP-386: Support adding custom metrics in Recovery Spans

2024-01-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33697.
--
Release Note: 
A braking change has been introduced to the `StateBackend` interface. This is 
relevant only to users that are implementing their own custom state backends.

Newly added methods 
`org.apache.flink.runtime.state.StateBackend#createKeyedStateBackend(KeyedStateBackendParameters
 parameters)` and 
`org.apache.flink.runtime.state.StateBackend#createOperatorStateBackend(OperatorStateBackendParameters
 parameters)` have replaced previous versions of the `createKeyedStateBackend` 
and `createOperatorStateBackend` methods. The new `parameters` POJO classes 
contain as fields all of the arguments that were passed directly to those 
methods.  
  Resolution: Fixed

Merged to master as: b660b06cb70^..e6556fa898d

> FLIP-386: Support adding custom metrics in Recovery Spans
> -
>
> Key: FLINK-33697
> URL: https://issues.apache.org/jira/browse/FLINK-33697
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Metrics, Runtime / State Backends
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>
> h1. Motivation
> [FLIP-386|https://cwiki.apache.org/confluence/x/VAuZE] is building on top of 
> [FLIP-384|https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces].
>  The intention here is to add a capability for state backends to attach 
> custom attributes during recovery to recovery spans. For example 
> RocksDBIncrementalRestoreOperation could report both remote download time and 
> time to actually clip/ingest the RocksDB instances after rescaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-11 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805739#comment-17805739
 ] 

Piotr Nowojski commented on FLINK-33856:


In that case [~hejufang001] it would be great if you started another FLIP for 
adding per sub-task spans. If you decide to od so, please ping me so on Apache 
Flink Slack or via a Jira ticket so I don't miss it :)

> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Assignee: Jufang He
>Priority: Major
>  Labels: pull-request-available
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33775) Report JobInitialization traces

2024-01-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33775.
--
Fix Version/s: 1.19.0
   Resolution: Fixed

merged commit 793a66b into apache:master

> Report JobInitialization traces
> ---
>
> Key: FLINK-33775
> URL: https://issues.apache.org/jira/browse/FLINK-33775
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33695) FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2024-01-05 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-33695.
--
Resolution: Fixed

> FLIP-384: Introduce TraceReporter and use it to create checkpointing and 
> recovery traces
> 
>
> Key: FLINK-33695
> URL: https://issues.apache.org/jira/browse/FLINK-33695
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.19.0
>
>
> https://cwiki.apache.org/confluence/x/TguZE
> *Motivation*
> Currently Flink has a limited observability of checkpoint and recovery 
> processes.
> For checkpointing Flink has a very detailed overview in the Flink WebUI, 
> which works great in many use cases, however it’s problematic if one is 
> operating multiple Flink clusters, or if cluster/JM dies. Additionally there 
> are a couple of metrics (like lastCheckpointDuration or lastCheckpointSize), 
> however those metrics have a couple of issues:
> * They are reported and refreshed periodically, depending on the 
> MetricReporter settings, which doesn’t take into account checkpointing 
> frequency.
> ** If checkpointing interval > metric reporting interval, we would be 
> reporting the same values multiple times.
> ** If checkpointing interval < metric reporting interval, we would be 
> randomly dropping metrics for some of the checkpoints.
> For recovery we are missing even the most basic of the metrics and Flink 
> WebUI support. Also given the fact that recovery is even less frequent 
> compared to checkpoints, adding recovery metrics would have even bigger 
> problems with unnecessary reporting the same values.
> In this FLIP I’m proposing to add support for reporting traces/spans 
> (example: Traces) and use this mechanism to report checkpointing and recovery 
> traces. I hope in the future traces will also prove useful in other areas of 
> Flink like job submission, job state changes, ... . Moreover as the API to 
> report traces will be added to the MetricGroup , users will be also able to 
> access this API. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33897) Allow triggering unaligned checkpoint via CLI

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803535#comment-17803535
 ] 

Piotr Nowojski commented on FLINK-33897:


By real world motivation, I meant if that really is an issue that someone 
complained about? If not, and this is just a theoretical possibility that comes 
from your observation when implementing FLINK-6755 "it could be implemented, 
someone might find it useful", I would put it aside for the time being. 
Honestly, I doubt many users would use this feature. In most cases just 
cancelling the job and restarting with new configuration would be faster vs 
someone first trying to find out in the docs/user mailing list/stack overflow 
that he can actually trigger unaligned checkpoint from CLI first. This would be 
only useful to a handful of power users, but those should already know about 
that it's better to use unaligned checkpoints from the get go.

{quote}
I'm not very familiar with this part so if you think this is a big change, I 
won't insist on doing it.
{quote}
Adding a new BarrierHandlerState maybe is not a very big change per se, but 
will visible increase complexity of the code when someone needs to 
read/understand it.

{quote}
I do agree we could enable timeout for aligned cp by default, which greatly 
reduce this case
{quote}
Let me start the dev mailing list discussion about that.

> Allow triggering unaligned checkpoint via CLI
> -
>
> Key: FLINK-33897
> URL: https://issues.apache.org/jira/browse/FLINK-33897
> Project: Flink
>  Issue Type: New Feature
>  Components: Command Line Client, Runtime / Checkpointing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> After FLINK-6755, user could trigger checkpoint through CLI. However I 
> noticed there would be value supporting trigger it in unaligned way, since 
> the job may encounter a high back-pressure and an aligned checkpoint would 
> fail.
>  
> I suggest we provide an option '-unaligned' in CLI to support that.
>  
> Similar option would also be useful for REST api



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2024-01-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803478#comment-17803478
 ] 

Piotr Nowojski edited comment on FLINK-33856 at 1/5/24 9:58 AM:


{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. This data is already aggregated on the JM 
from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. 
Besides, it's just a single RPC from subtask -> JM per checkpoint. If that 
becomes a problem, we would have problems in many different areas as well (for 
example {{notifyCheckpointCompleted}} is a very similar call but the other 
direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).

[~hejufang001] I would suggest that both of yo chat offline about the scope of 
the changes in [~fanrui]'s FLIP and/or eventual division of work. I'm not sure 
if [~fanrui] plans to add per task/subtask spans for checkpoints and/or 
recovery.


was (Author: pnowojski):
{quote}
Maybe a new flip that supports task-level trace reporter can builded ?  I’m 
willing to participate in the development.
{quote}
Please again check the FLIP-384 discussions. I was highlighting there a couple 
of difficulties:
{quote}
However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.
{quote}
https://lists.apache.org/thread/7lql5f5q1np68fw1wc9trq3d9l2ox8f4

At the same time:
{quote}
 I am worried that a large amount of data aggregation to JM may have 
performance problems.
{quote}
I wouldn't worry about that too much. This data is already aggregated on the JM 
from all of the TMs via {{CheckpointMetricsBuilder}} and {{CheckpointMetrics}}. 
Besides, it's just a single RPC from subtask -> JM per checkpoint. If that 
becomes a problem, we would have problems in many different areas as well (for 
example {{notifyCheckpointCompleted}} is a very similar call but the other 
direction).

Also AFAIR there are/were different ideas how to solve this potential 
bottleneck in a more generic way (having multiple job coordinators in the 
cluster to spread the load).



> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 

  1   2   3   4   5   6   7   8   9   10   >