[jira] [Commented] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`

2018-11-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684773#comment-16684773
 ] 

ASF GitHub Bot commented on KAFKA-7605:
---

ijuma closed pull request #5890: KAFKA-7605; Retry async commit failures in 
integration test cases to fix flaky tests
URL: https://github.com/apache/kafka/pull/5890
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 3e67b1876ec..e407c023e37 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, 
startingOffset = 0)
 
 // check async commit callbacks
-val commitCallback = new CountConsumerCommitCallback()
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
   }
 
   @Test
@@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 records
   }
 
-  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
-  commitCallback: 
CountConsumerCommitCallback,
-  count: Int = 1): Unit = {
-TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >= 
count,
+  protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V],
+  offsetsOpt: 
Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = {
+
+def sendAsyncCommit(callback: OffsetCommitCallback) = {
+  offsetsOpt match {
+case Some(offsets) => consumer.commitAsync(offsets.asJava, callback)
+case None => consumer.commitAsync(callback)
+  }
+}
+
+class RetryCommitCallback extends OffsetCommitCallback {
+  var isComplete = false
+  var error: Option[Exception] = None
+
+  override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = {
+exception match {
+  case e: RetriableCommitFailedException =>
+sendAsyncCommit(this)
+  case e =>
+isComplete = true
+error = Option(e)
+}
+  }
+}
+
+val commitCallback = new RetryCommitCallback
+
+sendAsyncCommit(commitCallback)
+TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete,
   "Failed to observe commit callback before timeout", waitTimeMs = 1)
-assertEquals(count, commitCallback.successCount)
+
+assertEquals(None, commitCallback.error)
   }
 
   protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: 
TestConsumerReassignmentListener): Unit = {
@@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness {
 // The best way to verify that the current membership is still active is 
to commit offsets.
 // This would fail if the group had rebalanced.
 val initialRevokeCalls = rebalanceListener.callsToRevoked
-val commitCallback = new CountConsumerCommitCallback
-consumer.commitAsync(commitCallback)
-awaitCommitCallback(consumer, commitCallback)
+sendAndAwaitAsyncCommit(consumer)
 assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked)
   }
 
   protected class CountConsumerCommitCallback extends OffsetCommitCallback {
 var successCount = 0
 var failCount = 0
+var lastError: Option[Exception] = None
 
 override def onComplete(offsets: util.Map[TopicPartition, 
OffsetAndMetadata], exception: Exception): Unit = {
-  if (exception == null)
+  if (exception == null) {
 successCount += 1
-  else
+  } else {
 failCount += 1
+lastError = Some(exception)
+  }
 }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 42b3984e305..5e590cf1df3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -478,14 +478,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 // async commit
 val asyncMetadata = new OffsetAndMetadata(10, "bar")
-val callback = new CountConsumerCommitCallback
-consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-awaitCommitCallback(consumer, callback)
+

[jira] [Commented] (KAFKA-7605) Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`

2018-11-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679106#comment-16679106
 ] 

ASF GitHub Bot commented on KAFKA-7605:
---

hachikuji opened a new pull request #5890: KAFKA-7605; Increase timeout for 
commit callback in testing
URL: https://github.com/apache/kafka/pull/5890
 
 
   We are seeing some timeouts in tests which depend on the 
`awaitCommitCallback` (e.g. 
`SaslMultiMechanismConsumerTest.testCoordinatorFailover`). I haven't been able 
to reproduce the failures locally, but it seems like a reasonable bet that we 
need to loosen the timeout a little bit for underpowered build environments.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test `SaslMultiMechanismConsumerTest.testCoordinatorFailover`
> ---
>
> Key: KAFKA-7605
> URL: https://issues.apache.org/jira/browse/KAFKA-7605
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> java.lang.AssertionError: Failed to observe commit callback before timeout
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:351)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:761)
>   at kafka.utils.TestUtils$.pollUntilTrue(TestUtils.scala:727)
>   at 
> kafka.api.BaseConsumerTest.awaitCommitCallback(BaseConsumerTest.scala:198)
>   at 
> kafka.api.BaseConsumerTest.ensureNoRebalance(BaseConsumerTest.scala:214)
>   at 
> kafka.api.BaseConsumerTest.testCoordinatorFailover(BaseConsumerTest.scala:117)
> {code}
> Probably just need to increase the timeout a little.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)