Repository: spark
Updated Branches:
  refs/heads/master f29c2b528 -> aa70a0a1a


[SPARK-25288][TESTS] Fix flaky Kafka transaction tests

## What changes were proposed in this pull request?

Here are the failures:

http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaRelationSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite&test_name=read+Kafka+transactional+messages%3A+read_committed

I found the Kafka consumer may not see the committed messages for a short time. 
This PR just adds a new method `waitUntilOffsetAppears` and uses it to make 
sure the consumer can see a specified offset before checking the result.

## How was this patch tested?

Jenkins

Closes #22293 from zsxwing/SPARK-25288.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa70a0a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa70a0a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa70a0a1

Branch: refs/heads/master
Commit: aa70a0a1a434e8a4b1d4dde00e20b865bb70b8dd
Parents: f29c2b5
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Thu Aug 30 23:23:11 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Thu Aug 30 23:23:11 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 34 ++++++++++++--------
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  7 ++++
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 10 ++++++
 3 files changed, 37 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index eb66cca..78249f7 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
   }
 
   object WithOffsetSync {
-    def apply(topic: String)(func: () => Unit): StreamAction = {
+    /**
+     * Run `func` to write some Kafka messages and wait until the latest 
offset of the given
+     * `TopicPartition` is not less than `expectedOffset`.
+     */
+    def apply(
+        topicPartition: TopicPartition,
+        expectedOffset: Long)(func: () => Unit): StreamAction = {
       Execute("Run Kafka Producer")(_ => {
         func()
         // This is a hack for the race condition that the committed message 
may be not visible to
         // consumer for a short time.
-        // Looks like after the following call returns, the consumer can 
always read the committed
-        // messages.
-        testUtils.getLatestOffsets(Set(topic))
+        testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset)
       })
     }
   }
@@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       }
     }
 
+    val topicPartition = new TopicPartition(topic, 0)
     // The message values are the same as their offsets to make the test easy 
to follow
     testUtils.withTranscationalProducer { producer =>
       testStream(mapped)(
         StartStream(ProcessingTime(100), clock),
         waitUntilBatchProcessed,
         CheckAnswer(),
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
           // Send 5 messages. They should be visible only after being 
committed.
           producer.beginTransaction()
           (0 to 4).foreach { i =>
@@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         waitUntilBatchProcessed,
         // Should not see any uncommitted messages
         CheckNewAnswer(),
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
           producer.commitTransaction()
         },
         AdvanceManualClock(100),
@@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a 
committed data message]
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
           // Send 5 messages and abort the transaction. They should not be 
read.
           producer.beginTransaction()
           (6 to 10).foreach { i =>
@@ -692,7 +697,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(), // offset: 9*, 10*, 11*
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
           // Send 5 messages again. The consumer should skip the above aborted 
messages and read
           // them.
           producer.beginTransaction()
@@ -707,7 +712,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(15, 16),  // offset: 15, 16, 17*
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
           producer.beginTransaction()
           producer.send(new ProducerRecord[String, String](topic, "18")).get()
           producer.commitTransaction()
@@ -774,13 +779,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       }
     }
 
+    val topicPartition = new TopicPartition(topic, 0)
     // The message values are the same as their offsets to make the test easy 
to follow
     testUtils.withTranscationalProducer { producer =>
       testStream(mapped)(
         StartStream(ProcessingTime(100), clock),
         waitUntilBatchProcessed,
         CheckNewAnswer(),
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
           // Send 5 messages. They should be visible only after being 
committed.
           producer.beginTransaction()
           (0 to 4).foreach { i =>
@@ -790,13 +796,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
           producer.commitTransaction()
         },
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a 
committed data message]
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
           // Send 5 messages and abort the transaction. They should not be 
read.
           producer.beginTransaction()
           (6 to 10).foreach { i =>
@@ -810,7 +816,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(9, 10), // offset: 9, 10, 11*
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
           // Send 5 messages again. The consumer should skip the above aborted 
messages and read
           // them.
           producer.beginTransaction()
@@ -825,7 +831,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         AdvanceManualClock(100),
         waitUntilBatchProcessed,
         CheckNewAnswer(15, 16),  // offset: 15, 16, 17*
-        WithOffsetSync(topic) { () =>
+        WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
           producer.beginTransaction()
           producer.send(new ProducerRecord[String, String](topic, "18")).get()
           producer.commitTransaction()

http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 93dba18..eb18697 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -260,6 +260,7 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       producer.commitTransaction()
 
       // Should read all committed messages
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
       checkAnswer(df, (1 to 5).map(_.toString).toDF)
 
       producer.beginTransaction()
@@ -269,6 +270,7 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       producer.abortTransaction()
 
       // Should not read aborted messages
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
       checkAnswer(df, (1 to 5).map(_.toString).toDF)
 
       producer.beginTransaction()
@@ -278,6 +280,7 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       producer.commitTransaction()
 
       // Should skip aborted messages and read new committed ones.
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
       checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF)
     }
   }
@@ -301,11 +304,13 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       }
 
       // "read_uncommitted" should see all messages including uncommitted ones
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 5)
       checkAnswer(df, (1 to 5).map(_.toString).toDF)
 
       producer.commitTransaction()
 
       // Should read all committed messages
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
       checkAnswer(df, (1 to 5).map(_.toString).toDF)
 
       producer.beginTransaction()
@@ -315,6 +320,7 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       producer.abortTransaction()
 
       // "read_uncommitted" should see all messages including uncommitted or 
aborted ones
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
       checkAnswer(df, (1 to 10).map(_.toString).toDF)
 
       producer.beginTransaction()
@@ -324,6 +330,7 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
       producer.commitTransaction()
 
       // Should read all messages
+      testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
       checkAnswer(df, (1 to 15).map(_.toString).toDF)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa70a0a1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7b742a3..bf6934b 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -439,6 +439,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
     }
   }
 
+  /**
+   * Wait until the latest offset of the given `TopicPartition` is not less 
than `offset`.
+   */
+  def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): 
Unit = {
+    eventually(timeout(60.seconds)) {
+      val currentOffset = 
getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
+      assert(currentOffset.nonEmpty && currentOffset.get >= offset)
+    }
+  }
+
   private class EmbeddedZookeeper(val zkConnect: String) {
     val snapshotDir = Utils.createTempDir()
     val logDir = Utils.createTempDir()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to