[
https://issues.apache.org/jira/browse/KAFKA-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384862#comment-16384862
]
ASF GitHub Bot commented on KAFKA-6606:
---
hachikuji closed pull request #4641: KAFKA-6606; Ensure consumer awaits
auto-commit interval after sending…
URL: https://github.com/apache/kafka/pull/4641
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2afa1ff9236..3c99c966d54 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -621,6 +621,7 @@ public boolean commitOffsetsSync(Map offsets,
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
+this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
@@ -633,14 +634,15 @@ private void doAutoCommitOffsetsAsync() {
@Override
public void onComplete(Map
offsets, Exception exception) {
if (exception != null) {
-log.warn("Asynchronous auto-commit of offsets {} failed:
{}", offsets, exception.getMessage());
-if (exception instanceof RetriableException)
+if (exception instanceof RetriableException) {
+log.debug("Asynchronous auto-commit of offsets {}
failed due to retriable error: {}", offsets,
+exception);
nextAutoCommitDeadline = Math.min(time.milliseconds()
+ retryBackoffMs, nextAutoCommitDeadline);
-else
-nextAutoCommitDeadline = time.milliseconds() +
autoCommitIntervalMs;
+} else {
+log.warn("Asynchronous auto-commit of offsets {}
failed: {}", offsets, exception.getMessage());
+}
} else {
log.debug("Completed asynchronous auto-commit of offsets
{}", offsets);
-nextAutoCommitDeadline = time.milliseconds() +
autoCommitIntervalMs;
}
}
});
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ac392055f9b..3e3c423a428 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -99,7 +99,6 @@
private int sessionTimeoutMs = 1;
private int heartbeatIntervalMs = 5000;
private long retryBackoffMs = 100;
-private boolean autoCommitEnabled = false;
private int autoCommitIntervalMs = 2000;
private MockPartitionAssignor partitionAssignor = new
MockPartitionAssignor();
private List assignors =
Collections.singletonList(partitionAssignor);
@@ -134,7 +133,7 @@ public void setup() {
this.partitionAssignor.clear();
client.setNode(node);
-this.coordinator = buildCoordinator(metrics, assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
+this.coordinator = buildCoordinator(metrics, assignors,
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
}
@After
@@ -209,7 +208,7 @@ public void
testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Excep
final AtomicInteger responses = new AtomicInteger(0);
for (int i = 0; i < numRequests; i++) {
-Map offsets =
Collections.singletonMap(tp, new OffsetAndMetadata(i));
+Map offsets = singletonMap(tp,
new OffsetAndMetadata(i));
coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback()
{
@Override
public void onComplete(Map
offsets, Exception exception) {
@@ -237,7 +236,7 @@ public void
testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws
coordinator.ensureCoordinatorReady();
final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
-Map