[
https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684773#comment-16684773
]
ASF GitHub Bot commented on KAFKA-7605:
---
ijuma closed pull request #5890: KAFKA-7605; Retry async commit failures in
integration test cases to fix flaky tests
URL: https://github.com/apache/kafka/pull/5890
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 3e67b1876ec..e407c023e37 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends
IntegrationTestHarness {
consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords,
startingOffset = 0)
// check async commit callbacks
-val commitCallback = new CountConsumerCommitCallback()
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
}
@Test
@@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends
IntegrationTestHarness {
records
}
- protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
- commitCallback:
CountConsumerCommitCallback,
- count: Int = 1): Unit = {
-TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >=
count,
+ protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V],
+ offsetsOpt:
Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = {
+
+def sendAsyncCommit(callback: OffsetCommitCallback) = {
+ offsetsOpt match {
+case Some(offsets) => consumer.commitAsync(offsets.asJava, callback)
+case None => consumer.commitAsync(callback)
+ }
+}
+
+class RetryCommitCallback extends OffsetCommitCallback {
+ var isComplete = false
+ var error: Option[Exception] = None
+
+ override def onComplete(offsets: util.Map[TopicPartition,
OffsetAndMetadata], exception: Exception): Unit = {
+exception match {
+ case e: RetriableCommitFailedException =>
+sendAsyncCommit(this)
+ case e =>
+isComplete = true
+error = Option(e)
+}
+ }
+}
+
+val commitCallback = new RetryCommitCallback
+
+sendAsyncCommit(commitCallback)
+TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete,
"Failed to observe commit callback before timeout", waitTimeMs = 1)
-assertEquals(count, commitCallback.successCount)
+
+assertEquals(None, commitCallback.error)
}
protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener:
TestConsumerReassignmentListener): Unit = {
@@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends
IntegrationTestHarness {
// The best way to verify that the current membership is still active is
to commit offsets.
// This would fail if the group had rebalanced.
val initialRevokeCalls = rebalanceListener.callsToRevoked
-val commitCallback = new CountConsumerCommitCallback
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked)
}
protected class CountConsumerCommitCallback extends OffsetCommitCallback {
var successCount = 0
var failCount = 0
+var lastError: Option[Exception] = None
override def onComplete(offsets: util.Map[TopicPartition,
OffsetAndMetadata], exception: Exception): Unit = {
- if (exception == null)
+ if (exception == null) {
successCount += 1
- else
+ } else {
failCount += 1
+lastError = Some(exception)
+ }
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 42b3984e305..5e590cf1df3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -478,14 +478,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// async commit
val asyncMetadata = new OffsetAndMetadata(10, "bar")
-val callback = new CountConsumerCommitCallback
-consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-awaitCommitCallback(consumer, callback)
+