satishgopalani commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r644757028



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -64,6 +66,15 @@ private[kafka010] class KafkaMicroBatchStream(
   private[kafka010] val maxOffsetsPerTrigger = Option(options.get(
     KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong)
 
+  private[kafka010] val minOffsetPerTrigger = Option(options.get(
+    KafkaSourceProvider.MIN_OFFSET_PER_TRIGGER)).map(_.toLong)
+
+  private[kafka010] val maxTriggerDelayMs =

Review comment:
       Done

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+    testStream(mapped)(
+      StartStream(Trigger.ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // First Batch is always processed
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
+      // and maxTriggerDelay is not expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      Assert {
+        testUtils.sendMessages(topic, (110 to 120).map(_.toString).toArray, 
Some(0))
+        testUtils.sendMessages(topic, Array("2"), Some(2))
+        true
+      },
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // Running batch now as number of records is greater than 
minOffsetsPerTrigger
+      // 2 from smallest, 10 more from middle, 20 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20),
+      // Testing maxTriggerDelay
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (121 to 125).map(_.toString).toArray, 
Some(0))
+        testUtils.sendMessages(topic, (21 to 25).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch till maxTriggerDelay is expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20),
+      // Sleeping for 10s to let maxTriggerDelay expire
+      Assert {
+        Thread.sleep(10 * 1000)
+        true
+      },
+      AdvanceManualClock(100),
+      // Running batch as maxTriggerDelay is expired
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+        13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25)
+    )
+    // When Trigger.Once() is used, the read limit should be ignored
+    val allData = Seq(1, 2) ++ (10 to 25) ++ (100 to 125)
+    withTempDir { dir =>
+      testStream(mapped)(
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(allData: _*),
+        StopStream,
+
+        AddKafkaData(Set(topic), 1000 to 1010: _*),
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer((allData ++ 1000.to(1010)): _*)
+      )
+    }
+  }
+
+  test("compositeReadLimit") {

Review comment:
       Updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to