noslowerdna commented on a change in pull request #33827:
URL: https://github.com/apache/spark/pull/33827#discussion_r696625012
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
##########
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
// Splits offset ranges with relatively large amount of data to smaller
ones.
val totalSize = offsetRanges.map(_.size).sum
+
+ // First distinguish between any small (i.e. unsplit) ranges and large
(i.e. split) ranges,
+ // in order to exclude the contents of small ranges from the
proportional math applied to
+ // large ranges
+ val smallRanges = offsetRanges.filter { range =>
+ getPartCount(range.size, totalSize, minPartitions.get) == 1
+ }
+
+ val smallRangeTotalSize = smallRanges.map(_.size).sum
+ val largeRangeTotalSize = totalSize - smallRangeTotalSize
+ val smallRangeTopicPartitions = smallRanges.map(_.topicPartition).toSet
+ val largeRangeMinPartitions = math.max(minPartitions.get -
smallRanges.size, 1)
Review comment:
Could you expand on this concern? I don't really understand. The goal
was to address the example given in the PR / JIRA description while ensuring
the behavior for other more balanced scenarios remained essentially unchanged.
Please let me know if there are specific situations you've identified where the
range calculation would be suboptimal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]