SubhamSinghal commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1779450088


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -24,19 +24,25 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
  * Class to calculate offset ranges to process based on the from and until 
offsets, and
- * the configured `minPartitions`.
+ * the configured `minPartitions` and `maxRecordsPerPartition`.
  */
-private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+private[kafka010] class KafkaOffsetRangeCalculator(
+    val minPartitions: Option[Int],
+    val maxRecordsPerPartition: Option[Long]) {
   require(minPartitions.isEmpty || minPartitions.get > 0)
+  require(maxRecordsPerPartition.isEmpty || maxRecordsPerPartition.get > 0)
 
   /**
    * Calculate the offset ranges that we are going to process this batch. If 
`minPartitions`
    * is not set or is set less than or equal the number of `topicPartitions` 
that we're going to
-   * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka 
partitions. If
+   * consume and, `maxRecordsPerPartition` is not set then we fall back to a 
1-1 mapping of Spark
+   * tasks to Kafka partitions. If `maxRecordsPerPartition` is set, then we 
will split up read task
+   * to multiple tasks as per `maxRecordsPerPartition` value. If
    * `minPartitions` is set higher than the number of our `topicPartitions`, 
then we will split up
    * the read tasks of the skewed partitions to multiple Spark tasks.
-   * The number of Spark tasks will be *approximately* `minPartitions`. It can 
be less or more
-   * depending on rounding errors or Kafka partitions that didn't receive any 
new data.
+   * The number of Spark tasks will be *approximately* max of 
`maxRecordsPerPartition`
+   * and `minPartitions`. It can be less or more depending on rounding errors 
or Kafka partitions

Review Comment:
   updated description to `max of 
`(recordsPerPartition/maxRecordsPerPartition)` and `minPartitions``



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to