HeartSaVioR commented on a change in pull request #25237: [SPARK-28489][SS] Fix 
a bug that KafkaOffsetRangeCalculator.getRanges may drop offsets
URL: https://github.com/apache/spark/pull/25237#discussion_r306609937
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 ##########
 @@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
 
       // Splits offset ranges with relatively large amount of data to smaller 
ones.
       val totalSize = offsetRanges.map(_.size).sum
-      val idealRangeSize = totalSize.toDouble / minPartitions.get
-
       offsetRanges.flatMap { range =>
-        // Split the current range into subranges as close to the ideal range 
size
-        val numSplitsInRange = math.round(range.size.toDouble / 
idealRangeSize).toInt
-
-        (0 until numSplitsInRange).map { i =>
-          val splitStart = range.fromOffset + range.size * (i.toDouble / 
numSplitsInRange)
-          val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / 
numSplitsInRange)
-          KafkaOffsetRange(
-            range.topicPartition, splitStart.toLong, splitEnd.toLong, 
preferredLoc = None)
+        val tp = range.topicPartition
+        val size = range.size
+        // number of partitions to divvy up this topic partition to
+        val parts = math.max(math.round(size.toDouble / totalSize * 
minPartitions.get), 1).toInt
 
 Review comment:
   Yeah I'm seeing the same. Suppose 4 offsetRanges divide 1 partition for each 
0.25, then we lost 1. The number of lost partitions may vary.
   
   In other words, if we use ceil, it may overflow the minimum partitions, and 
the number of exceeding partitions may vary. We don't guarantee for this 
calculator to return partitions closest to minimum partitions, so it seems OK.
   
   If we really would like to make this strict, we could apply "allocation" - 
calculating ratio on each offsetRange, and allocate partitions to each 
offsetRange according to ratio (apply minimum of 1 for safeness), and allocate 
extra partitions to some offsetRanges if there're remaining partitions. Not 
sure we would like to deal with complexity.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to