Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r212032759
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
---
@@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
}
)
}
+
+ test("read Kafka transactional messages: read_committed") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_committed")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains
no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv =>
kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being
committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic,
i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ // Should not see any uncommitted messages
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 3, 4, 5* [* means it's not a
committed data message]
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages and abort the transaction. They should not be
read.
+ producer.beginTransaction()
+ (6 to 10).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic,
i.toString)).get()
+ }
+ producer.abortTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 6*, 7*, 8*
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 5: _*), // offset: 9*, 10*, 11*
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages again. The consumer should skip the above
aborted messages and read
+ // them.
+ producer.beginTransaction()
+ (11 to 15).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic,
i.toString)).get()
+ }
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 13): _*), // offset: 12, 13, 14
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 15): _*), // offset: 15, 16, 17*
+ WithKafkaProducer(topic, producer) { producer =>
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic,
"16")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic,
"17")).get()
+ producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(new ProducerRecord[String, String](topic,
"18")).get()
+ producer.send(new ProducerRecord[String, String](topic,
"19")).get()
+ producer.commitTransaction()
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 17): _*), // offset: 18, 19*, 20
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*), // offset: 21*, 22, 23
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer((1 to 5) ++ (11 to 19): _*) // offset: 24*
+ )
+ } finally {
+ producer.close()
+ }
+ }
+
+ test("read Kafka transactional messages: read_uncommitted") {
+ // This test will cover the following cases:
+ // 1. the whole batch contains no data messages
+ // 2. the first offset in a batch is not a committed data message
+ // 3. the last offset in a batch is not a committed data message
+ // 4. there is a gap in the middle of a batch
+
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.isolation.level", "read_uncommitted")
+ .option("maxOffsetsPerTrigger", 3)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ // Set a short timeout to make the test fast. When a batch contains
no committed date
+ // messages, "poll" will wait until timeout.
+ .option("kafkaConsumer.pollTimeoutMs", 5000)
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv =>
kv._2.toInt)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ val producer = testUtils.createProducer(usingTrascation = true)
+ try {
+ producer.initTransactions()
+
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(),
+ WithKafkaProducer(topic, producer) { producer =>
+ // Send 5 messages. They should be visible only after being
committed.
+ producer.beginTransaction()
+ (1 to 5).foreach { i =>
+ producer.send(new ProducerRecord[String, String](topic,
i.toString)).get()
+ }
+ },
+ AdvanceManualClock(100),
+ waitUntilBatchProcessed,
+ CheckAnswer(1 to 3: _*), // offset 0, 1, 2
--- End diff --
> Why only 3 records when 1 to 5 has been sent already and we are reading
uncommitted data?
I'm using `maxOffsetsPerTrigger = 3` to cut the batches on purpose.
Otherwise, it's really hard to cover all of cases.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]