[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Closing this for #2574 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Actually, just discovered that the problem is different all together. While the KafkaConsumer is polling for new data (with a timeout), it holds the consumer lock. If no data comes in Kafka, the lock is not released before the poll timeout is over. During that time, neither a "commitSync" nor "commitAsync" call can be fired off. The `notifyCheckpointComplete` method hence blocks until the poll timeout is over and the lock is released. We can fix this by making sure that the consumer is "woken up" to release the lock, and by making sure the lock acquisition is fair, so the committer will get it next. For the sake of releasing the lock fast in the committer method, it should still be an asynchronous commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 @tzulitai Thanks for thorough review! I don't understand the problem why the `commitSpecificOffsetsToKafka` method is designed to commit synchronously. The `FlinkKafkaConsumerBase` has the pending checkpoints (I think that is what you refer to). It removes the HashMap of "offsets to commit" from the `pendingCheckpoints` Map synchronously, before even calling the fetcher to commit. After that, it looks to me like it does not make a difference how that Map "offsets to commit" is used (sync or async)... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Seems like currently only the 0.8 Kafka connector have tests related to offset committing (in `Kafka08ITCase`). My two cents for testing this for now is that a IT test for correct offset committing back to Kafka in the 0.9 connector is sufficient (can take a look at `Kafka08ITCase#testOffsetInZookeeper`, but replacing `ZookeeperOffsetHandler` with the new `KafkaConsumer` methods). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 @StephanEwen On a second look, I think the `commitSpecificOffsetsToKafka` method was designed to commit synchronously in the first place. `AbstractFetcher` holds a Map of all current pending offsets for committing by checkpointID, and on every `notifyCheckpointComplete` the offsets are removed from the Map before `commitSpecificOffsetsToKafka` is called. So, for async committing, I think we need to remove cleaning up the offsets in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a new separate callback handle method in `AbstractFetcher`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 Thanks @tzulitai for looking at this. I will leave the offset then as it is (fixed via followup) and The Kafka 0.8 connector needs a similar change. This here is encountered by a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 0.8. Will also correct the issue tag ;-) I have no good idea how to test this, though, so any thoughts there are welcome! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 @StephanEwen I think you've tagged the wrong Github ID for Robert ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Btw, just curious, does 0.8 Kafka connector have the same issue with sync committing? I haven't looked into the code for this, but just wondering if we need a ticket for 0.8 too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2559 Just had a look at the API of `commitAsync`, and it seems like the committed offsets back to Kafka through this API (likewise for `commitSync`) need to be `lastProcessedMessageOffset + 1` ([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))). This mainly effects that when starting from group offsets in Kafka, `FlinkKafkaConsumer09` currently starts from the wrong offset. There's a separate JIRA for this bug: [FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618). Another contributor had already picked up FLINK-4618, so I'd say it's ok to leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it gets merged after this PR. Minus the above, this looks good to me. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2559 @robert and @tzulitai What is your take on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---