[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-08-09 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577679#comment-17577679
 ] 

Luke Chen commented on FLINK-28060:
---

[~mason6345] , yes, Kafka v3.1.1 expects(assumes) consumer poll ran before 
commitAsync call. But in Kafka v3.2.1, this assumption is removed. So, in 
v3.2.1, even if commitAsync calls earlier than poll, it'll work well.

So, for your question:

1. However, unusually, step 4 will fail if the test doesn't invoke poll 
regardless of the kafka clients version. I think it is possible for Flink to 
have a race condition where commitAsync is executed before poll (short 
checkpoint interval causing commitAsync before poll if topic partitions take 
long to assign). Is this behavior intended?

--> Yes, Kafka should not assume poll calls first or commitAsync calls first. 
That's a bug fixed in Kafka v3.2.1

2. Otherwise, do you have any recommendations for reproducing the issue in a CI 
environment where we do not assume poll() is invoked?

--> I think it's good. I also use the reproducer in this ticket with Kafka 
v3.2.1, and also investigate Kafka client logs, I confirmed it works well even 
if commitAsync calls earlier than poll.

 

Thanks.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-08-09 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577675#comment-17577675
 ] 

Mason Chen commented on FLINK-28060:


[~showuon] Earlier, I had attempted to write an integration test with 
testcontainers in an attempt to reproduce the issue by following the steps in 
the ticket details. I compared the versions kafka-clients 2.8.1 and 3.1.1.

My test is as follows:
 # Start Kafka container and create topic
 # Create reader and assign the topics to underlying consumer
 # Fetch and invoke poll()
 # Call commitAsync()
 # Restart Kafka container 
 # Call commitAsync()
 # Call commitAsync()

For 2.8.1, there are 2 failures in the commit async call in step 6 and 7. For 
3.1.1, there is only 1 failure at step 6.

3.1.1 seems to be the desired behavior. Some concerns:
 # However, unusually, step 4 will fail if the test doesn't invoke poll. I 
think it is possible for Flink to have a race condition where commitAsync is 
executed before poll (short checkpoint interval causing commitAsync before poll 
if topic partitions take long to assign). Is this behavior intended?
 # Otherwise, do you have any recommendations for reproducing the issue in a CI 
environment where we do not assume poll() is invoked?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-08-09 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577368#comment-17577368
 ] 

Chesnay Schepler commented on FLINK-28060:
--

I ran CI with Kafka 3.2.1 and it passed without requiring any other changes.
https://dev.azure.com/chesnay/flink/_build/results?buildId=2944=results

We could think about including it in 1.16.0, although I'm not too fond of 
bumping dependencies so close before the feature freeze. (To be fair though, we 
would have a few weeks to observe it).

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-08-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575190#comment-17575190
 ] 

Luke Chen commented on FLINK-28060:
---

Could we try to test with Kafka v3.2.1. As mentioned in this comment: 
https://issues.apache.org/jira/browse/KAFKA-13840?focusedCommentId=17575186=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17575186
 , I believe this issue fix be fixed in Kafka v3.2.1. Thanks.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-08-02 Thread Kyle R Stehbens (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17574319#comment-17574319
 ] 

Kyle R Stehbens commented on FLINK-28060:
-

Hi, we we're experiencing this issues and downgraded our kafka client to 2.5.1. 
There we're changes to how the group coordinator reconnect logic works in 2.6.0 
and later that seems to have broken the recovery of the group co-coordinator 
and all kafka client version post 2.6.0 (inclusive)

Downgrading to v2.5.1 of the kafka library is confirmed working for us

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-07-27 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17572197#comment-17572197
 ] 

Qingsheng Ren commented on FLINK-28060:
---

[~danderson] The ticket KAFKA-13840 is still pending for validation. Few things 
we could do on our side but I'll try to find some time to validate the patch in 
Kafka and push it forward.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-07-26 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17571628#comment-17571628
 ] 

David Anderson commented on FLINK-28060:


[~martijnvisser] [~renqs] [~mason6345] Are we any closer to understanding this 
issue? Do we know how to fix it, either for a 1.15.2 release, or for 1.16.0?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-22 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557289#comment-17557289
 ] 

Martijn Visser commented on FLINK-28060:


I'm not sure that we have a test for this behaviour. It would actually be nice 
if we could get one in. 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-21 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557005#comment-17557005
 ] 

Mason Chen commented on FLINK-28060:


[~renqs] [~martijnvisser] were we able to reproduce this issue in Flink CI/unit 
test? I think we need to closely monitor 
[KAFKA-13840](https://issues.apache.org/jira/browse/KAFKA-13840), not sure if 
bumping to 3.1.1 really fixed the issue.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-20 Thread Christian Lorenz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556320#comment-17556320
 ] 

Christian Lorenz commented on FLINK-28060:
--

I think the according issue in Kafka is 
https://issues.apache.org/jira/browse/KAFKA-13840.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-18 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555896#comment-17555896
 ] 

Martijn Visser commented on FLINK-28060:


Fixed in master: 189f88485d75821fe285e61bbf6623e88aec24d3

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1758#comment-1758
 ] 

Martijn Visser commented on FLINK-28060:


+1 for this approach

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-17 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1727#comment-1727
 ] 

David Anderson commented on FLINK-28060:


It doesn't seem wise to upgrade the Kafka client from 2.8.1 to 3.1.1 for Flink 
1.15.1. That's a big change to make this close to our release, and we shouldn't 
delay this release any further. So here's a proposal:
 * We bump the Kafka Clients to 3.1.1 in master now.
 * We don't try to fix FLINK-28060 for 1.15.1.

 * We create the Flink 1.15.1 release straight away, noting that there's a 
known issue with Kafka (FLINK-28060).
 * We reach out to the Kafka community to see if they're willing to create a 
2.8.2 release with this patch.
 * In parallel, we merge the Kafka Clients bump to 3.1.1 after release 1.15.1 
is done, to see how it behaves on the CI for the next few weeks and plan a 
quick for Flink 1.15.2 (most likely something like a month later).

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555303#comment-17555303
 ] 

Martijn Visser commented on FLINK-28060:


I've opened a PR against {{master}} to bump Kafka Clients there. 

I've also opened a draft PR against {{release-1.15}} - I'm not sure we're yet 
OK with bumping Kafka Clients in a patch release, but I wanted to verify if CI 
works for this one too. Given the limited change and the arguments brought 
forward by [~renqs] on possible other bugs that we might re-introduce when we 
downgrade, I would probably +1 to bump Kafka Clients for Flink 1.15.1

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
>  Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555271#comment-17555271
 ] 

Martijn Visser commented on FLINK-28060:


In my private CI run the upgrade to 3.1.1 was successful (the failing tests are 
other test stabilities), see 
https://dev.azure.com/martijn0323/Flink/_build/results?buildId=2650=results

No interface changes were required. The only thing that would be good to verify 
is if this the newer version has any change with regards to a minimum version 
of Kafka protocol or broker.  I looked for it but I think this is not the case 
based on what I could find

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Christian Lorenz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555061#comment-17555061
 ] 

Christian Lorenz commented on FLINK-28060:
--

[~mason6345] I did not observe that the commit error vanished. To fix the error 
we currently restart our taskmanagers gracefully. I guess this will implicitly 
reset the broken kafka consumer state.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555058#comment-17555058
 ] 

Martijn Visser commented on FLINK-28060:


{{FlinkKafkaConsumer}} has not been removed, but it has been deprecated since 
1.14.0 via FLINK-24055. 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17555057#comment-17555057
 ] 

Mason Chen commented on FLINK-28060:


+1 on [~peter.schrott] 's assessment–we need also need this for metrics and in 
case Flink state is lost or we need to do a migration where it is operationally 
easier to throwaway Flink state.

In addition, Flink hasn't removed support for FlinkKafkaConsumer, right? The 
group offsets are essential for the migration process to the FLIP 27 Kafka 
Source since users will have operational issues moving without committed 
offsets in Kafka.

[~Christian.Lorenz77] I can look at the reproduction code next week. Does the 
commit eventually succeed? e.g. after the 5th checkpoint, nth checkpoint, etc?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554952#comment-17554952
 ] 

Martijn Visser commented on FLINK-28060:


I'm waiting for CI to complete a run upgrading to 3.1.1, that's looking good as 
well without needing to change any interface/implementation details. 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554949#comment-17554949
 ] 

Qingsheng Ren commented on FLINK-28060:
---

Personally I prefer to upgrade the Kafka client. Downgrading the Kafka client 
could introduce new bugs (like the one fixed in KAFKA-10793). I think the 
backward compatibility of Kafka is trustable according to our previous 
experience. On consumer side we don't use any hacks in Kafka source, purely 
depend on APIs so I think it should be fine, but for the producer we use 
[reflection|https://github.com/apache/flink/blob/c9a706b8388b324a37da43298e37074d0a452a34/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L331-L343]
 which is concerning. I can have a try to bump the version and see if it works. 

Any ideas from [~becket_qin] ?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554946#comment-17554946
 ] 

Chesnay Schepler commented on FLINK-28060:
--

Indeed going back to 2.4.1 doesn't seem feasible; going back to 2.6.1 at least 
doesn't produce compile errors; waiting on CI.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Christian Lorenz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554943#comment-17554943
 ] 

Christian Lorenz commented on FLINK-28060:
--

Does someone know if the Kafka Team is aware that this issue exists in 2.8.1? I 
think Flink might be one of the most prominent usescases of using topic#assign 
where this is occurring, but for sure this will harm other users as well. The 
most natural fix for this would be to upgrade to kafka-client 2.8.2 once 
available. 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554941#comment-17554941
 ] 

Martijn Visser commented on FLINK-28060:


I have sincere doubts about reverting FLINK-24765, because more changes has 
happened since then (like FLINK-25573, FLINK-27487, FLINK-27480 and I'm sure 
I'm missing some more).

So that probably leaves us with a downgrade of the Kafka Clients dependency or 
an upgrade of the Kafka Clients dependency? 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-16 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554935#comment-17554935
 ] 

David Anderson commented on FLINK-28060:


Along the lines of what Chesnay mentioned, what would be the impact of either 
reverting FLINK-24765, or rolling back the kafka client dependency to something 
like 2.6.1 (i.e., something newer than 2.4.1 but before the Kafka bug that was 
introduced by KAFKA-10793 in 2.6.2 and 2.7.1)?

 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-15 Thread Peter Schrott (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554609#comment-17554609
 ] 

Peter Schrott commented on FLINK-28060:
---

>From a users perspective I see 2 issues here:

1) Monitoring / Alerting: We are using consumer offsets / consumer lag to 
monitor potential issues with the Flink job, also in terms of performance, i.e. 
the lag gets too large.

2) StartingOffsets: Flink Kafka connector offers the feature of resuming from 
committed offsets [1]. This feature would be obsolet if offsets are not 
committed to Kafka (sure, one can always use savepoints when restarting a job)

 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#starting-offset

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-15 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554597#comment-17554597
 ] 

Chesnay Schepler commented on FLINK-28060:
--

Could someone clarify what this issue means in practice to users? Could we just 
_not_ commit the offsets?

I'm quite wary of doing a major version upgrade, independent of whether we have 
to adjust our code or not.
Alternatives include reverting FLINK-24765, or going back to 2.6.1 (i.e., some 
version that doesn't have the Kafka bug).

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-15 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554530#comment-17554530
 ] 

Martijn Visser commented on FLINK-28060:


I think it depends on the impact of bumping the Kafka Clients dependency. If 
it's "just" a dependency update without modifying the actual source code, I 
would probably be OK with it. If it requires refactoring of the interface, then 
I would say we can only fix this in Flink 1.16.0. I'll setup a test CI run to 
see what happens. 

Looking at the release notes, I do hope that the main reason for going to a new 
main version was the deprecation of Java 8 and the deprecation of Scala 2.12. 
If that's it, I do hope impact will be low. 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-15 Thread David Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554498#comment-17554498
 ] 

David Anderson commented on FLINK-28060:


I'd like to include a fix for this in 1.15.1, but I'm not sure about bumping up 
the Kafka client to 3.x. What are you thinking? [~martijnvisser] [~renqs] 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-15 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554457#comment-17554457
 ] 

Qingsheng Ren commented on FLINK-28060:
---

I did some investigation and strongly suspect that this is caused by 
KAFKA-13563. We upgraded the Kafka client from 2.4.1 to 2.8.1 in Flink 1.15, 
and the bug was introduced in 2.6.2 by KAFKA-10793 :(

The patch of KAFKA-13563 is applied only on Kafka 3.1 and 3.2, so we have to 
bump the version as well, or urge Kafka guys to cherry-pick the patch back on 
2.x. 

WDYT? [~martijnvisser] 

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-28060) Kafka Commit on checkpointing fails repeatedly after a broker restart

2022-06-14 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17554222#comment-17554222
 ] 

Martijn Visser commented on FLINK-28060:


[~renqs] This seems related to FLINK-27962 - Should we raise priority to at 
least Critical, potential Blocker? Seems like this is a regression. I linked 
the two tickets together, because they both have value. Probably we should 
close one of them in the end

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> -
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kafka
>Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>Reporter: Christian Lorenz
>Priority: Major
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)