Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r211804879 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -597,6 +614,254 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { + // This test will cover the following cases: + // 1. the whole batch contains no data messages + // 2. the first offset in a batch is not a committed data message + // 3. the last offset in a batch is not a committed data message + // 4. there is a gap in the middle of a batch + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch contains no committed date + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + + val producer = testUtils.createProducer(usingTrascation = true) + try { + producer.initTransactions() + + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(), + WithKafkaProducer(topic, producer) { producer => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // Should not see any uncommitted messages + CheckAnswer(), + WithKafkaProducer(topic, producer) { producer => + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1 to 3: _*), // offset 0, 1, 2 + AdvanceManualClock(100), + waitUntilBatchProcessed, --- End diff -- Why is this `waitUntilBatchProcessed` needed? CheckAnswer waits for the batch to complete anyways.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org