[jira] [Commented] (KAFKA-6606) Regression in consumer auto-commit backoff behavior

2018-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384862#comment-16384862
 ] 

ASF GitHub Bot commented on KAFKA-6606:
---

hachikuji closed pull request #4641: KAFKA-6606; Ensure consumer awaits 
auto-commit interval after sending…
URL: https://github.com/apache/kafka/pull/4641
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2afa1ff9236..3c99c966d54 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -621,6 +621,7 @@ public boolean commitOffsetsSync(Map offsets,
 
 public void maybeAutoCommitOffsetsAsync(long now) {
 if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
+this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
 doAutoCommitOffsetsAsync();
 }
 }
@@ -633,14 +634,15 @@ private void doAutoCommitOffsetsAsync() {
 @Override
 public void onComplete(Map 
offsets, Exception exception) {
 if (exception != null) {
-log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, exception.getMessage());
-if (exception instanceof RetriableException)
+if (exception instanceof RetriableException) {
+log.debug("Asynchronous auto-commit of offsets {} 
failed due to retriable error: {}", offsets,
+exception);
 nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
-else
-nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
+} else {
+log.warn("Asynchronous auto-commit of offsets {} 
failed: {}", offsets, exception.getMessage());
+}
 } else {
 log.debug("Completed asynchronous auto-commit of offsets 
{}", offsets);
-nextAutoCommitDeadline = time.milliseconds() + 
autoCommitIntervalMs;
 }
 }
 });
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ac392055f9b..3e3c423a428 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -99,7 +99,6 @@
 private int sessionTimeoutMs = 1;
 private int heartbeatIntervalMs = 5000;
 private long retryBackoffMs = 100;
-private boolean autoCommitEnabled = false;
 private int autoCommitIntervalMs = 2000;
 private MockPartitionAssignor partitionAssignor = new 
MockPartitionAssignor();
 private List assignors = 
Collections.singletonList(partitionAssignor);
@@ -134,7 +133,7 @@ public void setup() {
 this.partitionAssignor.clear();
 
 client.setNode(node);
-this.coordinator = buildCoordinator(metrics, assignors, 
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
+this.coordinator = buildCoordinator(metrics, assignors, 
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
 }
 
 @After
@@ -209,7 +208,7 @@ public void 
testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Excep
 final AtomicInteger responses = new AtomicInteger(0);
 
 for (int i = 0; i < numRequests; i++) {
-Map offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(i));
+Map offsets = singletonMap(tp, 
new OffsetAndMetadata(i));
 coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() 
{
 @Override
 public void onComplete(Map 
offsets, Exception exception) {
@@ -237,7 +236,7 @@ public void 
testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws
 coordinator.ensureCoordinatorReady();
 
 final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
-Map

[jira] [Commented] (KAFKA-6606) Regression in consumer auto-commit backoff behavior

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384401#comment-16384401
 ] 

ASF GitHub Bot commented on KAFKA-6606:
---

hachikuji opened a new pull request #4641: KAFKA-6606; Ensure consumer awaits 
auto-commit interval after sending…
URL: https://github.com/apache/kafka/pull/4641
 
 
   We need to reset the auto-commit deadline after sending the offset commit 
request so that we do not resend it while the request is still inflight. 
   
   Added unit tests ensuring this behavior and proper backoff in the case of a 
failure.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in consumer auto-commit backoff behavior
> ---
>
> Key: KAFKA-6606
> URL: https://issues.apache.org/jira/browse/KAFKA-6606
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 1.1.0
>
>
> We introduced a regression in the auto-commit behavior in KAFKA-6362. After 
> initiating a send, the consumer does not reset its next commit deadline, so 
> it will send auto-commits as fast as the user can call poll() until the first 
> offset commit returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)