micheal-o commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1779355320


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -24,19 +24,25 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Class to calculate offset ranges to process based on the from and until 
offsets, and
- * the configured `minPartitions`.
+ * the configured `minPartitions` and `maxRecordsPerPartition`.
  */
-private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+private[kafka010] class KafkaOffsetRangeCalculator(
+    val minPartitions: Option[Int],
+    val maxRecordsPerPartition: Option[Long]) {
   require(minPartitions.isEmpty || minPartitions.get > 0)
+  require(maxRecordsPerPartition.isEmpty || maxRecordsPerPartition.get > 0)
 
   /**
    * Calculate the offset ranges that we are going to process this batch. If 
`minPartitions`
    * is not set or is set less than or equal the number of `topicPartitions` 
that we're going to
-   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * consume and, `maxRecordsPerPartition` is not set then we fall back to a 
1-1 mapping of Spark
+   * tasks to Kafka partitions. If `maxRecordsPerPartition` is set, then we 
will split up read task
+   * to multiple tasks as per `maxRecordsPerPartition` value. If
    * `minPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
    * the read tasks of the skewed partitions to multiple Spark tasks.
-   * The number of Spark tasks will be *approximately* `minPartitions`. It can 
be less or more
-   * depending on rounding errors or Kafka partitions that didn't receive any 
new data.
+   * The number of Spark tasks will be *approximately* max of 
`maxRecordsPerPartition`
+   * and `minPartitions`. It can be less or more depending on rounding errors 
or Kafka partitions

Review Comment:
   >The number of Spark tasks will be *approximately* max of 
`maxRecordsPerPartition`
       and `minPartitions`.
   
   This statement is incorrect right?



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -253,6 +274,75 @@ class KafkaOffsetRangeCalculatorSuite extends 
SparkFunSuite {
           KafkaOffsetRange(tp3, 7500, 10000, None)))
   }
 
+  testWithMaxRecordsPerPartition("SPARK-49259: 1 TopicPartition to N offset 
ranges", 4) { calc =>
+    assert(
+      calc.getRanges(
+        Seq(KafkaOffsetRange(tp1, 1, 5))) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 5, None)))
+
+    assert(
+      calc.getRanges(
+        Seq(KafkaOffsetRange(tp1, 1, 2))) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 2, None)))
+
+    assert(
+      calc.getRanges(
+        Seq(KafkaOffsetRange(tp1, 1, 6)),
+        executorLocations = Seq("location")) ==
+        Seq(
+          KafkaOffsetRange(tp1, 1, 3, None),
+          KafkaOffsetRange(tp1, 3, 6, None))) // location pref not set when 
minPartition is set

Review Comment:
   nit: fix comment



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -34,6 +34,27 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
     }
   }
 
+  def testWithMaxRecordsPerPartition(name: String, maxRecordsPerPartition: 
Long)
+      (f: KafkaOffsetRangeCalculator => Unit): Unit = {
+    val options = new CaseInsensitiveStringMap(
+      Map("maxRecordsPerPartition" -> maxRecordsPerPartition.toString).asJava)
+    test(s"with maxRecordsPerPartition = $maxRecordsPerPartition: $name") {
+      f(KafkaOffsetRangeCalculator(options))
+    }
+  }
+
+  def testWithMinPartitionsAndMaxRecordsPerPartition(name: String,

Review Comment:
   nit: `name: String,` should be on a new line
   
   See how to run 
[scalafmt](https://spark.apache.org/contributing.html#:~:text=resolve%20the%20JIRA.-,Code%20style%20guide,-Please%20follow%20the)



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -557,6 +565,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
   private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+  private[kafka010] val MAX_RECORDS_PER_PARTITION_OPTION_KEY = 
"maxRecordsPerPartition"

Review Comment:
   nit: should we use `maxrecordsperpartition` instead, to be consistent with 
others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to