Repository: spark Updated Branches: refs/heads/master f29c2b528 -> aa70a0a1a
[SPARK-25288][TESTS] Fix flaky Kafka transaction tests ## What changes were proposed in this pull request? Here are the failures: http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed I found the Kafka consumer may not see the committed messages for a short time. This PR just adds a new method `waitUntilOffsetAppears` and uses it to make sure the consumer can see a specified offset before checking the result. ## How was this patch tested? Jenkins Closes #22293 from zsxwing/SPARK-25288. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa70a0a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa70a0a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa70a0a1 Branch: refs/heads/master Commit: aa70a0a1a434e8a4b1d4dde00e20b865bb70b8dd Parents: f29c2b5 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Thu Aug 30 23:23:11 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Thu Aug 30 23:23:11 2018 -0700 ---------------------------------------------------------------------- .../kafka010/KafkaMicroBatchSourceSuite.scala | 34 ++++++++++++-------- .../spark/sql/kafka010/KafkaRelationSuite.scala | 7 ++++ .../spark/sql/kafka010/KafkaTestUtils.scala | 10 ++++++ 3 files changed, 37 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index eb66cca..78249f7 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf } object WithOffsetSync { - def apply(topic: String)(func: () => Unit): StreamAction = { + /** + * Run `func` to write some Kafka messages and wait until the latest offset of the given + * `TopicPartition` is not less than `expectedOffset`. + */ + def apply( + topicPartition: TopicPartition, + expectedOffset: Long)(func: () => Unit): StreamAction = { Execute("Run Kafka Producer")(_ => { func() // This is a hack for the race condition that the committed message may be not visible to // consumer for a short time. - // Looks like after the following call returns, the consumer can always read the committed - // messages. - testUtils.getLatestOffsets(Set(topic)) + testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset) }) } } @@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + val topicPartition = new TopicPartition(topic, 0) // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 5) { () => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() (0 to 4).foreach { i => @@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { waitUntilBatchProcessed, // Should not see any uncommitted messages CheckNewAnswer(), - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 6) { () => producer.commitTransaction() }, AdvanceManualClock(100), @@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 12) { () => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() (6 to 10).foreach { i => @@ -692,7 +697,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(), // offset: 9*, 10*, 11* - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 18) { () => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() @@ -707,7 +712,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(15, 16), // offset: 15, 16, 17* - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 25) { () => producer.beginTransaction() producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction() @@ -774,13 +779,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + val topicPartition = new TopicPartition(topic, 0) // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, CheckNewAnswer(), - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 5) { () => // Send 5 messages. They should be visible only after being committed. producer.beginTransaction() (0 to 4).foreach { i => @@ -790,13 +796,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 6) { () => producer.commitTransaction() }, AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 12) { () => // Send 5 messages and abort the transaction. They should not be read. producer.beginTransaction() (6 to 10).foreach { i => @@ -810,7 +816,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(9, 10), // offset: 9, 10, 11* - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 18) { () => // Send 5 messages again. The consumer should skip the above aborted messages and read // them. producer.beginTransaction() @@ -825,7 +831,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { AdvanceManualClock(100), waitUntilBatchProcessed, CheckNewAnswer(15, 16), // offset: 15, 16, 17* - WithOffsetSync(topic) { () => + WithOffsetSync(topicPartition, expectedOffset = 25) { () => producer.beginTransaction() producer.send(new ProducerRecord[String, String](topic, "18")).get() producer.commitTransaction() http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 93dba18..eb18697 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -260,6 +260,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest producer.commitTransaction() // Should read all committed messages + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6) checkAnswer(df, (1 to 5).map(_.toString).toDF) producer.beginTransaction() @@ -269,6 +270,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest producer.abortTransaction() // Should not read aborted messages + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12) checkAnswer(df, (1 to 5).map(_.toString).toDF) producer.beginTransaction() @@ -278,6 +280,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest producer.commitTransaction() // Should skip aborted messages and read new committed ones. + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18) checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF) } } @@ -301,11 +304,13 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest } // "read_uncommitted" should see all messages including uncommitted ones + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 5) checkAnswer(df, (1 to 5).map(_.toString).toDF) producer.commitTransaction() // Should read all committed messages + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6) checkAnswer(df, (1 to 5).map(_.toString).toDF) producer.beginTransaction() @@ -315,6 +320,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest producer.abortTransaction() // "read_uncommitted" should see all messages including uncommitted or aborted ones + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12) checkAnswer(df, (1 to 10).map(_.toString).toDF) producer.beginTransaction() @@ -324,6 +330,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest producer.commitTransaction() // Should read all messages + testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18) checkAnswer(df, (1 to 15).map(_.toString).toDF) } } http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 7b742a3..bf6934b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -439,6 +439,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L } } + /** + * Wait until the latest offset of the given `TopicPartition` is not less than `offset`. + */ + def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = { + eventually(timeout(60.seconds)) { + val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition) + assert(currentOffset.nonEmpty && currentOffset.get >= offset) + } + } + private class EmbeddedZookeeper(val zkConnect: String) { val snapshotDir = Utils.createTempDir() val logDir = Utils.createTempDir() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org