HeartSaVioR commented on a change in pull request #32653: URL: https://github.com/apache/spark/pull/32653#discussion_r642765440
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala ########## @@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + var skipBatch = false Review comment: Now I see duplicated codes around due to branches handling each type, including CompositeReadLimit which handles both lower and upper hence having same code. How about changing like below: ``` val limits: Seq[ReadLimit] = readLimit match { case rows: CompositeReadLimit => rows.getReadLimits case rows => Seq(rows) } val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) { // ReadAllAvailable has the highest priority latestPartitionOffsets } else { val lowerLimit = limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows]) val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) lowerLimit.flatMap { limit => // checking if we need to skip batch based on minOffsetPerTrigger criteria val skipBatch = delayBatch( limit.minRows, latestPartitionOffsets, startPartitionOffsets, limit.maxTriggerDelayMs) if (skipBatch) { logDebug( s"Delaying batch as number of records available is less than minOffsetsPerTrigger") Some(startPartitionOffsets) } else { None } }.orElse { // checking if we need to adjust a range of offsets based on maxOffsetPerTrigger criteria upperLimit.map { limit => rateLimit(limit.maxRows(), startPartitionOffsets, latestPartitionOffsets) } }.getOrElse(latestPartitionOffsets) } endPartitionOffsets = KafkaSourceOffset(offsets) endPartitionOffsets ``` this would require less change when we want to add more read limits in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org