HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r647159789



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +114,65 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
-    endPartitionOffsets = KafkaSourceOffset(readLimit match {
-      case rows: ReadMaxRows =>
-        rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
-      case _: ReadAllAvailable =>
-        latestPartitionOffsets
-    })
+
+    val limits: Seq[ReadLimit] = readLimit match {
+      case rows: CompositeReadLimit => rows.getReadLimits
+      case rows => Seq(rows)
+    }
+
+    val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
+      // ReadAllAvailable has the highest priority
+      latestPartitionOffsets
+    } else {
+      val lowerLimit = 
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
+      val upperLimit = 
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
+
+      lowerLimit.flatMap { limit =>
+        // checking if we need to skip batch based on minOffsetPerTrigger 
criteria
+        val skipBatch = delayBatch(
+          limit.minRows, latestPartitionOffsets, startPartitionOffsets, 
limit.maxTriggerDelayMs)
+        if (skipBatch) {
+          logDebug(
+            s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
+          Some(startPartitionOffsets)
+        } else {
+          None
+        }
+      }.orElse {
+        // checking if we need to adjust a range of offsets based on 
maxOffsetPerTrigger criteria
+        upperLimit.map { limit =>
+          rateLimit(limit.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+        }
+      }.getOrElse(latestPartitionOffsets)
+    }
+
+    endPartitionOffsets = KafkaSourceOffset(offsets)
     endPartitionOffsets
   }
 
+  /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
+  private def delayBatch(
+      minLimit: Long,
+      latestOffsets: Map[TopicPartition, Long],
+      currentOffsets: Map[TopicPartition, Long],
+      maxTriggerDelayMs: Long): Boolean = {
+    // Checking first if the maxbatchDelay time has passed

Review comment:
       nit: It won't hurt if we only call `System.currentTimeMillis()` once and 
reuse it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to