Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r211805821
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
    @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
           }
         )
       }
    +
    +  test("read Kafka transactional messages: read_committed") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains 
no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being 
committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        // Should not see any uncommitted messages
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a 
committed data message]
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages and abort the transaction. They should not be 
read.
    +          producer.beginTransaction()
    +          (6 to 10).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
    +          }
    +          producer.abortTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages again. The consumer should skip the above 
aborted messages and read
    +          // them.
    +          producer.beginTransaction()
    +          (11 to 15).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
    +          }
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 15): _*),  // offset: 15, 16, 17*
    +        WithKafkaProducer(topic, producer) { producer =>
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, 
"16")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, 
"17")).get()
    +          producer.commitTransaction()
    +          producer.beginTransaction()
    +          producer.send(new ProducerRecord[String, String](topic, 
"18")).get()
    +          producer.send(new ProducerRecord[String, String](topic, 
"19")).get()
    +          producer.commitTransaction()
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
    +      )
    +    } finally {
    +      producer.close()
    +    }
    +  }
    +
    +  test("read Kafka transactional messages: read_uncommitted") {
    +    // This test will cover the following cases:
    +    // 1. the whole batch contains no data messages
    +    // 2. the first offset in a batch is not a committed data message
    +    // 3. the last offset in a batch is not a committed data message
    +    // 4. there is a gap in the middle of a batch
    +
    +    val topic = newTopic()
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val reader = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_uncommitted")
    +      .option("maxOffsetsPerTrigger", 3)
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      // Set a short timeout to make the test fast. When a batch contains 
no committed date
    +      // messages, "poll" will wait until timeout.
    +      .option("kafkaConsumer.pollTimeoutMs", 5000)
    +    val kafka = reader.load()
    +      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +      .as[(String, String)]
    +    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
    +
    +    val clock = new StreamManualClock
    +
    +    val waitUntilBatchProcessed = AssertOnQuery { q =>
    +      eventually(Timeout(streamingTimeout)) {
    +        if (!q.exception.isDefined) {
    +          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    +        }
    +      }
    +      if (q.exception.isDefined) {
    +        throw q.exception.get
    +      }
    +      true
    +    }
    +
    +    val producer = testUtils.createProducer(usingTrascation = true)
    +    try {
    +      producer.initTransactions()
    +
    +      testStream(mapped)(
    +        StartStream(ProcessingTime(100), clock),
    +        waitUntilBatchProcessed,
    +        // 1 from smallest, 1 from middle, 8 from biggest
    +        CheckAnswer(),
    +        WithKafkaProducer(topic, producer) { producer =>
    +          // Send 5 messages. They should be visible only after being 
committed.
    +          producer.beginTransaction()
    +          (1 to 5).foreach { i =>
    +            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
    +          }
    +        },
    +        AdvanceManualClock(100),
    +        waitUntilBatchProcessed,
    +        CheckAnswer(1 to 3: _*), // offset 0, 1, 2
    --- End diff --
    
    Why only 3 records when 1 to 5 has been sent already and we are reading 
uncommitted data?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to