HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642765440
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream(
override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets =
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
latestPartitionOffsets =
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+ var skipBatch = false
Review comment:
Now I see duplicated codes around due to branches handling each type,
including CompositeReadLimit which handles both lower and upper hence having
same code.
How about changing like below:
```
val limits: Seq[ReadLimit] = readLimit match {
case rows: CompositeReadLimit => rows.getReadLimits
case rows => Seq(rows)
}
val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
// ReadAllAvailable has the highest priority
latestPartitionOffsets
} else {
val lowerLimit =
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
val upperLimit =
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
lowerLimit.flatMap { limit =>
// checking if we need to skip batch based on minOffsetPerTrigger
criteria
val skipBatch = delayBatch(
limit.minRows, latestPartitionOffsets, startPartitionOffsets,
limit.maxTriggerDelayMs)
if (skipBatch) {
logDebug(
s"Delaying batch as number of records available is less than
minOffsetsPerTrigger")
Some(startPartitionOffsets)
} else {
None
}
}.orElse {
// checking if we need to adjust a range of offsets based on
maxOffsetPerTrigger criteria
upperLimit.map { limit =>
rateLimit(limit.maxRows(), startPartitionOffsets,
latestPartitionOffsets)
}
}.getOrElse(latestPartitionOffsets)
}
endPartitionOffsets = KafkaSourceOffset(offsets)
endPartitionOffsets
```
this would require less change when we want to add more read limits in the
future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]