Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20150#discussion_r161341002
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
    @@ -318,6 +318,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
         )
       }
     
    +  test("union bug in failover") {
    +    def getSpecificDF(range: Range.Inclusive): 
org.apache.spark.sql.Dataset[Int] = {
    +      val topic = newTopic()
    +      testUtils.createTopic(topic, partitions = 1)
    +      testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
    +
    +      val reader = spark
    +        .readStream
    +        .format("kafka")
    +        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +        .option("kafka.metadata.max.age.ms", "1")
    +        .option("maxOffsetsPerTrigger", 5)
    +        .option("subscribe", topic)
    +        .option("startingOffsets", "earliest")
    +
    +      reader.load()
    +        .selectExpr("CAST(value AS STRING)")
    +        .as[String]
    +        .map(k => k.toInt)
    +    }
    +
    +    val df1 = getSpecificDF(0 to 9)
    +    val df2 = getSpecificDF(100 to 199)
    +
    +    val kafka = df1.union(df2)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    testStream(kafka)(
    +      StartStream(ProcessingTime(100), clock),
    +      waitUntilBatchProcessed,
    +      // 5 from smaller topic, 5 from bigger one
    +      CheckAnswer(0, 1, 2, 3, 4, 100, 101, 102, 103, 104),
    --- End diff --
    
    You can clean these codes a bit using the following snippet:
    ```
        testStream(kafka)(
          StartStream(ProcessingTime(100), clock),
          waitUntilBatchProcessed,
          // 5 from smaller topic, 5 from bigger one
          CheckLastBatch((0 to 4) ++ (100 to 104): _*),
          AdvanceManualClock(100),
          waitUntilBatchProcessed,
          // 5 from smaller topic, 5 from bigger one
          CheckLastBatch((5 to 9) ++ (105 to 109): _*),
          AdvanceManualClock(100),
          waitUntilBatchProcessed,
          // smaller topic empty, 5 from bigger one
          CheckLastBatch(110 to 114: _*),
          StopStream,
          StartStream(ProcessingTime(100), clock),
          waitUntilBatchProcessed,
          // smallest now empty, 5 from bigger one
          CheckLastBatch(115 to 119: _*),
          AdvanceManualClock(100),
          waitUntilBatchProcessed,
          // smallest now empty, 5 from bigger one
          CheckLastBatch(120 to 124: _*)
        )
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to