[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2559 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r81089161 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Mapoffsets) } return result; } + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map offsets, Exception exception) { + commitInProgress = false; + + if (exception != null) { + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception); --- End diff -- Oops, this is actually correct, sorry. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80914495 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Mapoffsets) if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + --- End diff -- I agree, makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80909904 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Mapoffsets) if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + --- End diff -- Possibly yes. But on the other hand, this should be pretty visible if it happens. I would expect that with proper options to participate in group checkpoint committing, most Flink jobs run without committing to Kafka/ZooKeeper. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80907326 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -285,7 +293,14 @@ public void commitSpecificOffsetsToKafka(Mapoffsets) if (this.consumer != null) { synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); + if (!commitInProgress) { + commitInProgress = true; + this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback); + } + else { + LOG.warn("Committing previous checkpoint's offsets to Kafka not completed. " + --- End diff -- If the user sets a relatively short checkpoint interval, will this be flooding log? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80906814 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -301,4 +316,16 @@ public void commitSpecificOffsetsToKafka(Mapoffsets) } return result; } + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map offsets, Exception exception) { + commitInProgress = false; + + if (exception != null) { + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints", exception); --- End diff -- The exception message isn't included in the log warning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2559#discussion_r80903481 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -86,6 +90,9 @@ /** Flag to mark the main work loop as alive */ private volatile boolean running = true; + /** Flag indicating whether a commit of offsets to Kafka it currently happening */ --- End diff -- nit: it --> "is"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2559 [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously The offset commit calls to Kafka may occasionally take very long. In that case, the notifyCheckpointComplete() method blocks for long and the KafkaConsumer cannot make progress and cannot perform checkpoints. This pull request changes the offset committing to use Kafka's `commitAsync()` method. It also makes sure that no more than one commit is concurrently in progress, to that commit requests do not pile up. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2559.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2559 commit eafba8600863c18e09397366485bcfc6ff44960f Author: Stephan EwenDate: 2016-09-27T18:59:35Z [FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---