[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-29 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2559


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r81089161
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -301,4 +316,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
return result;
}
+
+   private class CommitCallback implements OffsetCommitCallback {
+
+   @Override
+   public void onComplete(Map 
offsets, Exception exception) {
+   commitInProgress = false;
+
+   if (exception != null) {
+   LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints", exception);
--- End diff --

Oops, this is actually correct, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80914495
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

I agree, makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80909904
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

Possibly yes. But on the other hand, this should be pretty visible if it 
happens.
I would expect that with proper options to participate in group checkpoint 
committing, most Flink jobs run without committing to Kafka/ZooKeeper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80907326
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -285,7 +293,14 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
 
if (this.consumer != null) {
synchronized (consumerLock) {
-   this.consumer.commitSync(offsetsToCommit);
+   if (!commitInProgress) {
+   commitInProgress = true;
+   
this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
+   }
+   else {
+   LOG.warn("Committing previous 
checkpoint's offsets to Kafka not completed. " +
--- End diff --

If the user sets a relatively short checkpoint interval, will this be 
flooding log?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80906814
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -301,4 +316,16 @@ public void 
commitSpecificOffsetsToKafka(Map offsets)
}
return result;
}
+
+   private class CommitCallback implements OffsetCommitCallback {
+
+   @Override
+   public void onComplete(Map 
offsets, Exception exception) {
+   commitInProgress = false;
+
+   if (exception != null) {
+   LOG.warn("Committing offsets to Kafka failed. 
This does not compromise Flink's checkpoints", exception);
--- End diff --

The exception message isn't included in the log warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-28 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2559#discussion_r80903481
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
@@ -86,6 +90,9 @@
/** Flag to mark the main work loop as alive */
private volatile boolean running = true;
 
+   /** Flag indicating whether a commit of offsets to Kafka it currently 
happening */
--- End diff --

nit: it --> "is"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2559: [FLINK-4702] [kafka connector] Commit offets to Ka...

2016-09-27 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2559

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously

The offset commit calls to Kafka may occasionally take very long. In that 
case, the notifyCheckpointComplete() method blocks for long and the 
KafkaConsumer cannot make progress and cannot perform checkpoints.

This pull request changes the offset committing to use Kafka's 
`commitAsync()` method.
It also makes sure that no more than one commit is concurrently in 
progress, to that commit requests do not pile up.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink kafka_commit_async

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2559


commit eafba8600863c18e09397366485bcfc6ff44960f
Author: Stephan Ewen 
Date:   2016-09-27T18:59:35Z

[FLINK-4702] [kafka connector] Commit offets to Kafka asynchronously




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---