HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642793811



##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Arrays;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2

Review comment:
       The target version will be 3.2.0 in best case. We don't add a new 
functionality in bugfix versions.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+    testStream(mapped)(
+      StartStream(Trigger.ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // First Batch is always processed
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
+      // and maxTriggerDelay is not expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,

Review comment:
       Please use CheckNewAnswer here and below to assert only new output if 
that works for the test.
   
   If CheckNewAnswer works without waitUntilBatchProcessed, even better.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>

Review comment:
       Let's extract this to `private def` and reuse across tests in this file.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")

Review comment:
       Let's pick smaller value like 5 seconds or similar. It won't introduce 
flakiness and less time duration to run.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2

Review comment:
       Same here.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+    testStream(mapped)(
+      StartStream(Trigger.ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // First Batch is always processed
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
+      // and maxTriggerDelay is not expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      Assert {
+        testUtils.sendMessages(topic, (110 to 120).map(_.toString).toArray, 
Some(0))
+        testUtils.sendMessages(topic, Array("2"), Some(2))
+        true
+      },
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // Running batch now as number of records is greater than 
minOffsetsPerTrigger
+      // 2 from smallest, 10 more from middle, 20 more from biggest
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20),
+      // Testing maxTriggerDelay
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (121 to 125).map(_.toString).toArray, 
Some(0))
+        testUtils.sendMessages(topic, (21 to 25).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch till maxTriggerDelay is expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20),
+      // Sleeping for 10s to let maxTriggerDelay expire
+      Assert {
+        Thread.sleep(10 * 1000)
+        true
+      },
+      AdvanceManualClock(100),
+      // Running batch as maxTriggerDelay is expired
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
+        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
+        13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25)
+    )
+    // When Trigger.Once() is used, the read limit should be ignored
+    val allData = Seq(1, 2) ++ (10 to 25) ++ (100 to 125)
+    withTempDir { dir =>
+      testStream(mapped)(
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(allData: _*),
+        StopStream,
+
+        AddKafkaData(Set(topic), 1000 to 1010: _*),
+        StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer((allData ++ 1000.to(1010)): _*)
+      )
+    }
+  }
+
+  test("compositeReadLimit") {

Review comment:
       Same comments here.

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
##########
@@ -139,26 +156,78 @@ private[kafka010] class KafkaSource(
   override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): 
streaming.Offset = {
     // Make sure initialPartitionOffsets is initialized
     initialPartitionOffsets
-
-    val latest = kafkaReader.fetchLatestOffsets(
-      currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
+    val currentOffsets = 
currentPartitionOffsets.orElse(Some(initialPartitionOffsets))
+    val latest = kafkaReader.fetchLatestOffsets(currentOffsets)
+    var skipBatch = false

Review comment:
       Something like:
   
   ```
     override def latestOffset(startOffset: streaming.Offset, limit: 
ReadLimit): streaming.Offset = {
       // Make sure initialPartitionOffsets is initialized
       initialPartitionOffsets
       val currentOffsets = 
currentPartitionOffsets.orElse(Some(initialPartitionOffsets))
       val latest = kafkaReader.fetchLatestOffsets(currentOffsets)
       latestPartitionOffsets = Some(latest)
   
       val limits: Seq[ReadLimit] = limit match {
         case rows: CompositeReadLimit => rows.getReadLimits
         case rows => Seq(rows)
       }
   
       val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
         // ReadAllAvailable has the highest priority
         latest
       } else {
         val lowerLimit = 
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
         val upperLimit = 
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
   
         lowerLimit.flatMap { limit =>
           // checking if we need to skip batch based on minOffsetPerTrigger 
criteria
           val skipBatch = delayBatch(
             limit.minRows, latest, currentOffsets.get, limit.maxTriggerDelayMs)
           if (skipBatch) {
             logDebug(
               s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
             // Pass same current offsets as output to skip trigger
             Some(currentOffsets.get)
           } else {
             None
           }
         }.orElse {
           // checking if we need to adjust a range of offsets based on 
maxOffsetPerTrigger criteria
           upperLimit.map { limit =>
             rateLimit(limit.maxRows, 
currentPartitionOffsets.getOrElse(initialPartitionOffsets),
               latest)
           }
         }.getOrElse(latest)
       }
   
       currentPartitionOffsets = Some(offsets)
       logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
       KafkaSourceOffset(offsets)
     }
   ```

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+    var skipBatch = false

Review comment:
       Btw I guess we can extract the logic of calculating offsets (including 
delayBatch, of course) based on ReadLimit across DSv1 and DSv2. These codes are 
mostly duplicated.
   
   Maybe having a new object `KafkaSourceOffsetHelper` and extracting the logic 
into method(s) there? The content of `getDefaultReadLimit` is another good 
thing to extract into  as well.

##########
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
   }
 
+  test("minOffsetsPerTrigger") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+    testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+    testUtils.sendMessages(topic, Array("1"), Some(2))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("minOffsetsPerTrigger", 15)
+      .option("maxTriggerDelay", "10s")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+    testStream(mapped)(
+      StartStream(Trigger.ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // First Batch is always processed
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      // Adding more data but less than minOffsetsPerTrigger
+      Assert {
+        testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, 
Some(1))
+        true
+      },
+      // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
+      // and maxTriggerDelay is not expired
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+        11, 108, 109, 12, 13, 14),
+      Assert {
+        testUtils.sendMessages(topic, (110 to 120).map(_.toString).toArray, 
Some(0))
+        testUtils.sendMessages(topic, Array("2"), Some(2))
+        true
+      },
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // Running batch now as number of records is greater than 
minOffsetsPerTrigger
+      // 2 from smallest, 10 more from middle, 20 more from biggest

Review comment:
       Once CheckNewAnswer works we don't need the code comment for expected 
values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to