HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642765440



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
     val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
     latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+    var skipBatch = false

Review comment:
       Now I see duplicated codes around due to branches handling each type, 
including CompositeReadLimit which handles both lower and upper hence having 
same code.
   
   How about changing like below:
   
   ```
       val limits: Seq[ReadLimit] = readLimit match {
         case rows: CompositeReadLimit => rows.getReadLimits
         case rows => Seq(rows)
       }
   
       val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
         // ReadAllAvailable has the highest priority
         latestPartitionOffsets
       } else {
         val lowerLimit = 
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
         val upperLimit = 
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
   
         lowerLimit.flatMap { limit =>
           // checking if we need to skip batch based on minOffsetPerTrigger 
criteria
           val skipBatch = delayBatch(
             limit.minRows, latestPartitionOffsets, startPartitionOffsets, 
limit.maxTriggerDelayMs)
           if (skipBatch) {
             logDebug(
               s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
             Some(startPartitionOffsets)
           } else {
             None
           }
         }.orElse {
           // checking if we need to adjust a range of offsets based on 
maxOffsetPerTrigger criteria
           upperLimit.map { limit =>
             rateLimit(limit.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
           }
         }.getOrElse(latestPartitionOffsets)
       }
   
       endPartitionOffsets = KafkaSourceOffset(offsets)
       endPartitionOffsets
   ```
   
   this would require less change when we want to add more read limits in the 
future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to