This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d1be37  [SPARK-36576][SS] Improve range split calculation for Kafka 
Source minPartitions option
7d1be37 is described below

commit 7d1be3710dddd446c23606c3871e28d211ad9776
Author: Andrew Olson <aols...@cerner.com>
AuthorDate: Sun Aug 29 16:38:29 2021 +0900

    [SPARK-36576][SS] Improve range split calculation for Kafka Source 
minPartitions option
    
    ### What changes were proposed in this pull request?
    
    Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic 
be modified to exclude small (i.e. un-split) partitions from the overall 
proportional distribution math, in order to more reasonably divide the large 
partitions when they are accompanied by many small partitions, and to provide 
optimal behavior for cases where a `minPartitions` value is deliberately 
computed based on the volume of data being read.
    
    ### Why are the changes needed?
    
    While the 
[documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
 does contain a clear disclaimer,
    
    > Please note that this configuration is like a hint: the number of Spark 
tasks will be **approximately** `minPartitions`. It can be less or more 
depending on rounding errors or Kafka partitions that didn't receive any new 
data.
    
    there are cases where the calculated Kafka partition range splits can 
differ greatly from expectations. For evenly distributed data and most 
`minPartitions `values this would not be a major or commonly encountered 
concern. However when the distribution of data across partitions is very 
heavily skewed, somewhat surprising range split calculations can result.
    
    For example, given the following input data:
    
    - 1 partition containing 10,000 messages
    - 1,000 partitions each containing 1 message
    
    Spark processing code loading from this collection of 1,001 partitions may 
decide that it would like each task to read no more than 1,000 messages. 
Consequently, it could specify a `minPartitions` value of 1,010 — expecting the 
single large partition to be split into 10 equal chunks, along with the 1,000 
small partitions each having their own task. That is far from what actually 
occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large 
partition into 918 chunks of [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests and added new unit tests
    
    Closes #33827 from noslowerdna/SPARK-36576.
    
    Authored-by: Andrew Olson <aols...@cerner.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  | 31 ++++++++++--
 .../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 58 ++++++++++++++++++++--
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      |  4 +-
 3 files changed, 84 insertions(+), 9 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 1e9a62e..4c0620a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -33,12 +33,13 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
    * Calculate the offset ranges that we are going to process this batch. If 
`minPartitions`
    * is not set or is set less than or equal the number of `topicPartitions` 
that we're going to
    * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
-   * `numPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
+   * `minPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
    * the read tasks of the skewed partitions to multiple Spark tasks.
-   * The number of Spark tasks will be *approximately* `numPartitions`. It can 
be less or more
+   * The number of Spark tasks will be *approximately* `minPartitions`. It can 
be less or more
    * depending on rounding errors or Kafka partitions that didn't receive any 
new data.
    *
-   * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
+   * Empty (`KafkaOffsetRange.size == 0`) or invalid (`KafkaOffsetRange.size < 
0`) ranges  will be
+   * dropped.
    */
   def getRanges(
       ranges: Seq[KafkaOffsetRange],
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
 
       // Splits offset ranges with relatively large amount of data to smaller 
ones.
       val totalSize = offsetRanges.map(_.size).sum
+
+      // First distinguish between any small (i.e. unsplit) ranges and large 
(i.e. split) ranges,
+      // in order to exclude the contents of unsplit ranges from the 
proportional math applied to
+      // split ranges
+      val unsplitRanges = offsetRanges.filter { range =>
+        getPartCount(range.size, totalSize, minPartitions.get) == 1
+      }
+
+      val unsplitRangeTotalSize = unsplitRanges.map(_.size).sum
+      val splitRangeTotalSize = totalSize - unsplitRangeTotalSize
+      val unsplitRangeTopicPartitions = 
unsplitRanges.map(_.topicPartition).toSet
+      val splitRangeMinPartitions = math.max(minPartitions.get - 
unsplitRanges.size, 1)
+
+      // Now we can apply the main calculation logic
       offsetRanges.flatMap { range =>
         val tp = range.topicPartition
         val size = range.size
         // number of partitions to divvy up this topic partition to
-        val parts = math.max(math.round(size.toDouble / totalSize * 
minPartitions.get), 1).toInt
+        val parts = if (unsplitRangeTopicPartitions.contains(tp)) {
+          1
+        } else {
+          getPartCount(size, splitRangeTotalSize, splitRangeMinPartitions)
+        }
         var remaining = size
         var startOffset = range.fromOffset
         (0 until parts).map { part =>
@@ -76,6 +95,10 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
     }
   }
 
+  private def getPartCount(size: Long, totalSize: Long, minParts: Int): Int = {
+    math.max(math.round(size.toDouble / totalSize * minParts), 1).toInt
+  }
+
   private def getLocation(tp: TopicPartition, executorLocations: Seq[String]): 
Option[String] = {
     def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
 
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 751b877..4ef019c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -106,7 +106,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
           KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when 
minPartition is set
   }
 
-  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) { 
calc =>
+  testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 4) { 
calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -134,7 +134,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
 
   testWithMinPartitions(
       "SPARK-30656: N very skewed TopicPartitions to M offset ranges",
-      3) { calc =>
+      4) { calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -170,7 +170,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite 
{
           KafkaOffsetRange(tp1, 7, 11, None)))
   }
 
-  testWithMinPartitions("empty ranges ignored", 3) { calc =>
+  testWithMinPartitions("empty ranges ignored", 4) { calc =>
     assert(
       calc.getRanges(
         Seq(
@@ -201,6 +201,58 @@ class KafkaOffsetRangeCalculatorSuite extends 
SparkFunSuite {
           KafkaOffsetRange(tp3, 0, 1, None)))
   }
 
+  testWithMinPartitions(
+    "SPARK-36576: 0 small unsplit ranges and 3 large split ranges", 9) { calc 
=>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 10000),
+          KafkaOffsetRange(tp2, 0, 15000),
+          KafkaOffsetRange(tp3, 0, 20000))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 5000, None),
+          KafkaOffsetRange(tp1, 5000, 10000, None),
+          KafkaOffsetRange(tp2, 0, 5000, None),
+          KafkaOffsetRange(tp2, 5000, 10000, None),
+          KafkaOffsetRange(tp2, 10000, 15000, None),
+          KafkaOffsetRange(tp3, 0, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 10000, None),
+          KafkaOffsetRange(tp3, 10000, 15000, None),
+          KafkaOffsetRange(tp3, 15000, 20000, None)))
+  }
+
+  testWithMinPartitions("SPARK-36576: 1 small unsplit range and 2 large split 
ranges", 6) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 500),
+          KafkaOffsetRange(tp2, 0, 12000),
+          KafkaOffsetRange(tp3, 0, 15001))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 500, None),
+          KafkaOffsetRange(tp2, 0, 6000, None),
+          KafkaOffsetRange(tp2, 6000, 12000, None),
+          KafkaOffsetRange(tp3, 0, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 10000, None),
+          KafkaOffsetRange(tp3, 10000, 15001, None)))
+  }
+
+  testWithMinPartitions("SPARK-36576: 2 small unsplit ranges and 1 large split 
range", 6) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(
+          KafkaOffsetRange(tp1, 0, 1),
+          KafkaOffsetRange(tp2, 0, 1),
+          KafkaOffsetRange(tp3, 0, 10000))) ===
+        Seq(
+          KafkaOffsetRange(tp1, 0, 1, None),
+          KafkaOffsetRange(tp2, 0, 1, None),
+          KafkaOffsetRange(tp3, 0, 2500, None),
+          KafkaOffsetRange(tp3, 2500, 5000, None),
+          KafkaOffsetRange(tp3, 5000, 7500, None),
+          KafkaOffsetRange(tp3, 7500, 10000, None)))
+  }
+
   private val tp1 = new TopicPartition("t1", 1)
   private val tp2 = new TopicPartition("t2", 1)
   private val tp3 = new TopicPartition("t3", 1)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index d1e49b0..332db54 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -143,7 +143,7 @@ class KafkaOffsetReaderSuite extends QueryTest with 
SharedSparkSession with Kafk
     testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
-    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val reader = createKafkaReader(topic, minPartitions = Some(4))
 
     val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> 
EARLIEST))
     val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
@@ -163,7 +163,7 @@ class KafkaOffsetReaderSuite extends QueryTest with 
SharedSparkSession with Kafk
     testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
-    val reader = createKafkaReader(topic, minPartitions = Some(3))
+    val reader = createKafkaReader(topic, minPartitions = Some(4))
 
     val fromPartitionOffsets = Map(tp1 -> 0L, tp2 -> 0L)
     val untilPartitionOffsets = Map(tp1 -> 100L, tp2 -> 3L)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to