HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r638448022



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -64,6 +66,15 @@ private[kafka010] class KafkaMicroBatchStream(
   private[kafka010] val maxOffsetsPerTrigger = Option(options.get(
     KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong)
 
+  private[kafka010] val minOffsetPerTrigger = Option(options.get(
+    KafkaSourceProvider.MIN_OFFSET_PER_TRIGGER)).map(_.toLong)
+
+  private[kafka010] val maxTriggerDelayMs =

Review comment:
       I'd like to see this as a part of `ReadMinRows`, so that other data 
source implementations leveraging this should also respect the maximum wait 
time. But let's hear others' voices as well.

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
##########
@@ -524,6 +528,9 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
   private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
   private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
+  private[kafka010] val MIN_OFFSET_PER_TRIGGER = "minOffsetsPerTrigger"

Review comment:
       We use all lowercases here to represent the fact the option name is case 
insensitive.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2
+ */
+@Evolving
+public final class CompositeReadLimit implements ReadLimit {

Review comment:
       I'd like to see this be general container of ReadLimits containing 
`ReadLimit[]` or `List<ReadLimit>`, instead of sticking with minRows and 
maxRows. e.g. Once we want to do the same for ReadMinFiles we will have to have 
another class.

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
##########
@@ -87,9 +88,17 @@ private[kafka010] class KafkaSource(
   private val maxOffsetsPerTrigger =
     sourceOptions.get(MAX_OFFSET_PER_TRIGGER).map(_.toLong)
 
+  private[kafka010] val minOffsetPerTrigger =
+    sourceOptions.get(MIN_OFFSET_PER_TRIGGER).map(_.toLong)
+
+  private[kafka010] val maxTriggerDelayMs =

Review comment:
       Same here.

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +112,51 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+    var skipBatch = false
     endPartitionOffsets = KafkaSourceOffset(readLimit match {
+      case rows: ReadMinRows =>
+        // checking if we need to skip batch based on minOffsetPerTrigger 
criteria
+        skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets, 
startPartitionOffsets)
+        if (skipBatch) {
+          logDebug(
+            s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
+          startPartitionOffsets
+        } else latestPartitionOffsets
       case rows: ReadMaxRows =>
         rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+      case rows: CompositeReadLimit =>
+        skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets, 
startPartitionOffsets)
+        if (skipBatch) {
+          logDebug(
+            s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
+          startPartitionOffsets
+        } else {
+          rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+        }
       case _: ReadAllAvailable =>
         latestPartitionOffsets
     })
+    if (!skipBatch) lastTriggerMillis = System.currentTimeMillis()
     endPartitionOffsets
   }
 
+  /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
+  private def delayBatch(minLimit: Long,

Review comment:
       Indentation looks to be off: please refer the style guide on indentation 
of the method parameters.
   https://github.com/databricks/scala-style-guide




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to