noslowerdna commented on a change in pull request #33827:
URL: https://github.com/apache/spark/pull/33827#discussion_r696834415
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
##########
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
// Splits offset ranges with relatively large amount of data to smaller
ones.
val totalSize = offsetRanges.map(_.size).sum
+
+ // First distinguish between any small (i.e. unsplit) ranges and large
(i.e. split) ranges,
+ // in order to exclude the contents of small ranges from the
proportional math applied to
+ // large ranges
+ val smallRanges = offsetRanges.filter { range =>
+ getPartCount(range.size, totalSize, minPartitions.get) == 1
+ }
+
+ val smallRangeTotalSize = smallRanges.map(_.size).sum
+ val largeRangeTotalSize = totalSize - smallRangeTotalSize
+ val smallRangeTopicPartitions = smallRanges.map(_.topicPartition).toSet
+ val largeRangeMinPartitions = math.max(minPartitions.get -
smallRanges.size, 1)
Review comment:
Yes that is a correct assessment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]