viirya commented on a change in pull request #33827:
URL: https://github.com/apache/spark/pull/33827#discussion_r696402495
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
##########
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
// Splits offset ranges with relatively large amount of data to smaller
ones.
val totalSize = offsetRanges.map(_.size).sum
+
+ // First distinguish between any small (i.e. unsplit) ranges and large
(i.e. split) ranges,
+ // in order to exclude the contents of small ranges from the
proportional math applied to
+ // large ranges
+ val smallRanges = offsetRanges.filter { range =>
+ getPartCount(range.size, totalSize, minPartitions.get) == 1
+ }
+
+ val smallRangeTotalSize = smallRanges.map(_.size).sum
+ val largeRangeTotalSize = totalSize - smallRangeTotalSize
+ val smallRangeTopicPartitions = smallRanges.map(_.topicPartition).toSet
+ val largeRangeMinPartitions = math.max(minPartitions.get -
smallRanges.size, 1)
Review comment:
Hmm, so looks like this change prefers to keep the partition number
close to specified min partition. But the cost might be more skew partitions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]