[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Priority: Critical  (was: Major)

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit and then should not 
> progress when 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2024-05-15 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-32828:
---
Affects Version/s: 1.18.1
   1.19.0

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1, 1.19.0, 1.18.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Assignee: Piotr Nowojski
>Priority: Critical
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from all partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
> attached docker-compose.yml)
>  # 
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z{color}}}
> {color:#172b4d}Expected: Watermark should progress a bit 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-11 Thread Grzegorz Liter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Liter updated FLINK-32828:
---
Description: 
When using KafkaSource with partition aware watermarks. Watermarks are being 
emitted even when only one partition has some events just after job startup 
from savepoint/checkpoint. After it has some events on other partitions the 
watermark behaviour is correct and watermark is emited as a minimum watarmark 
from all partition.

 

Steps to reproduce:
 # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
attached docker-compose.yml)

 # 
 ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
test-2 --partitions 4}}
 # Create a job that (see attached `test-job.java`):
 ## uses a KafkaSource with 
`WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
 ## has parallelism lower than number of partitions
 ## stores checkpoint/savepoint
 # Start job
 # Send events only on single partition
 ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
test-2 --property "parse.key=true" --property "key.separator=:"}}

 

{{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
-292275055-05-16T16:47:04.192Z}}

Expected: Watermark does not progress. Actual: Watermark does not progress.

5. Stop the job

6. Startup job from last checkpoint/savepoint

7. Send events only on single partitions

{{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
2023-08-10T12:53:30.661Z}}
{{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
2023-08-10T12:53:35.077Z}}

Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
progress*{color}

 

{color:#172b4d}To add bit more of context:{color}

{color:#172b4d}8. Send events on other partitions and then send events only on 
single partitions{color}

{{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2             
      [] - == Received: test-2/0: 2 -> a, timestamp 2023-08-10T12:54:54.104Z, 
watermark 2023-08-10T12:53:38.510Z
14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
2023-08-10T12:53:38.510Z
14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
2023-08-10T12:53:38.510Z
14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
2023-08-10T12:54:44.103Z
14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
2023-08-10T12:54:44.103Z
14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
2023-08-10T12:54:44.103Z{color}}}

{color:#172b4d}Expected: Watermark should progress a bit and then should not 
progress when receiving events only on single partition. {color}

{color:#172b4d}Actual: As expected{color}

 

 

{color:#172b4d}This behavior also shows as a burst of late events just after 
startup and then no more late events when job operates normally. {color}

  was:
When using KafkaSource with partition aware watermarks. Watermarks are being 
emitted even when only one partition has some events just after job startup 
from savepoint/checkpoint. After it has some events on other partitions the 
watermark behaviour is correct and watermark is emited as a minimum watarmark 
from each partition.

 

Steps to reproduce:
 # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
attached docker-compose.yml)

 ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
test-2 --partitions 4}}
 # Create a job that (see attached `test-job.java`):
 ## uses a KafkaSource with 
`WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
 ## has parallelism lower than number of partitions
 ## stores 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Grzegorz Liter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Liter updated FLINK-32828:
---
Description: 
When using KafkaSource with partition aware watermarks. Watermarks are being 
emitted even when only one partition has some events just after job startup 
from savepoint/checkpoint. After it has some events on other partitions the 
watermark behaviour is correct and watermark is emited as a minimum watarmark 
from each partition.

 

Steps to reproduce:
 # Setup a Kafka cluster with a topic that has 2 or more partitions. (see 
attached docker-compose.yml)

 ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
test-2 --partitions 4}}
 # Create a job that (see attached `test-job.java`):
 ## uses a KafkaSource with 
`WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
 ## has parallelism lower than number of partitions
 ## stores checkpoint/savepoint
 # Start job
 # Send events only on single partition
 ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
test-2 --property "parse.key=true" --property "key.separator=:"}}

 

{{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
-292275055-05-16T16:47:04.192Z}}

Expected: Watermark does not progress. Actual: Watermark does not progress.

5. Stop the job

6. Startup job from last checkpoint/savepoint

7. Send events only on single partitions

{{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
-292275055-05-16T16:47:04.192Z}}
{{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
2023-08-10T12:53:30.661Z}}
{{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
2023-08-10T12:53:35.077Z}}

Expected: Watermark does not progress. {color:#ff}*Actual: Watermark has 
progress*{color}

 

{color:#172b4d}To add bit more of context:{color}

{color:#172b4d}8. Send events on other partitions and then send events only on 
single partitions{color}

{{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2             
      [] - == Received: test-2/0: 2 -> a, timestamp 2023-08-10T12:54:54.104Z, 
watermark 2023-08-10T12:53:38.510Z
14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
2023-08-10T12:53:38.510Z
14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
2023-08-10T12:53:38.510Z
14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
2023-08-10T12:54:44.103Z
14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
2023-08-10T12:54:44.103Z
14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
2023-08-10T12:54:44.103Z{color}}}

{color:#172b4d}Expected: Watermark should progress a bit and then should not 
progress when receiving events only on single partition. {color}

{color:#172b4d}Actual: As expected{color}

 

 

{color:#172b4d}This behavior also shows as a burst of late events just after 
startup and then no more late events when job operates normally. {color}

  was:
When using KafkaSource with partition aware watermarks. Watermarks are being 
emitted even when only one partition has some events just after job startup 
from savepoint/checkpoint. After it has some events on other partitions the 
watermark behaviour is correct and watermark is emited as a minimum watarmark 
from each partition.

 

Steps to reproduce:
 # Setup a Kafka cluster with a topic that has 2 or more partitions.
 ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
test-2 --partitions 4}}
 # Create a job that (see attached `test-job.java`):
 ## uses a KafkaSource with 
`WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
 ## has parallelism lower than number of partitions
 ## stores checkpoint/savepoint
 # Start job
 # 

[jira] [Updated] (FLINK-32828) Kafka partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

2023-08-10 Thread Grzegorz Liter (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grzegorz Liter updated FLINK-32828:
---
Summary: Kafka partition aware watermark not handled correctly shortly 
after job start up from checkpoint or savepoint  (was: Kafka partition aware 
watermark not handled correctly shortly after job start up)

> Kafka partition aware watermark not handled correctly shortly after job start 
> up from checkpoint or savepoint
> -
>
> Key: FLINK-32828
> URL: https://issues.apache.org/jira/browse/FLINK-32828
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.17.1
> Environment: Affected environments:
>  * Local MiniCluster + Confluent Kafka run in docker
>  ** See attached files
>  * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka 
> cluster run in Kubernetes cluster
>Reporter: Grzegorz Liter
>Priority: Major
> Attachments: docker-compose.yml, test-job.java
>
>
> When using KafkaSource with partition aware watermarks. Watermarks are being 
> emitted even when only one partition has some events just after job startup 
> from savepoint/checkpoint. After it has some events on other partitions the 
> watermark behaviour is correct and watermark is emited as a minimum watarmark 
> from each partition.
>  
> Steps to reproduce:
>  # Setup a Kafka cluster with a topic that has 2 or more partitions.
>  ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
> test-2 --partitions 4}}
>  # Create a job that (see attached `test-job.java`):
>  ## uses a KafkaSource with 
> `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
>  ## has parallelism lower than number of partitions
>  ## stores checkpoint/savepoint
>  # Start job
>  # Send events only on single partition
>  ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
> test-2 --property "parse.key=true" --property "key.separator=:"}}
>  
> {{14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> Expected: Watermark does not progress. Actual: Watermark does not progress.
> 5. Stop the job
> 6. Startup job from last checkpoint/savepoint
> 7. Send events only on single partitions
> {{14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark 
> -292275055-05-16T16:47:04.192Z}}
> {{14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 
> 2023-08-10T12:53:30.661Z}}
> {{14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - 
> == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 
> 2023-08-10T12:53:35.077Z}}
> Expected: Watermark does not progress. {color:#FF}*Actual: Watermark has 
> progress*{color}
>  
> {color:#172b4d}To add bit more of context:{color}
> {color:#172b4d}8. Send events on other partitions and then send events only 
> on single partitions{color}
> {{{color:#172b4d}14:54:55,112 WARN  com.example.TestJob6$InputSink2           
>         [] - == Received: test-2/0: 2 -> a, timestamp 
> 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 
> 2023-08-10T12:53:38.510Z
> 14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 
> 2023-08-10T12:54:44.103Z
> 14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == 
> Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 
> 2023-08-10T12:54:44.103Z
> {color}}}
>