micheal-o commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1776366259


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -80,18 +83,35 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
         } else {
           getPartCount(size, splitRangeTotalSize, splitRangeMinPartitions)
         }
-        var remaining = size
-        var startOffset = range.fromOffset
-        (0 until parts).map { part =>
-          // Fine to do integer division. Last partition will consume all the 
round off errors
-          val thisPartition = remaining / (parts - part)
-          remaining -= thisPartition
-          val endOffset = math.min(startOffset + thisPartition, 
range.untilOffset)
-          val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, None)
-          startOffset = endOffset
-          offsetRange
-        }
+        getDividedPartition(parts, range)
       }.filter(_.size > 0)
+    } else {
+      val maxRecords = maxRecordsPerPartition.get
+
+      offsetRanges.flatMap { range =>
+        val size = range.size
+        // number of partitions to divvy up this topic partition to
+        val parts = math.ceil(size.toDouble / maxRecords).toInt
+        getDividedPartition(parts, range)
+      }.filter(_.size > 0)
+    }
+  }
+
+  private def getDividedPartition(parts: Int, offsetRange: KafkaOffsetRange)
+  : IndexedSeq[KafkaOffsetRange] = {

Review Comment:
   nit: incorrect format



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala:
##########
@@ -99,14 +99,18 @@ private[kafka010] class KafkaOffsetReaderAdmin(
    */
   private val minPartitions =
     
readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+  private val maxRecordsPerPartition =
+    
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).map(_.toLong)
 
-  private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+  private val rangeCalculator =
+    new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
 
   /**
    * Whether we should divide Kafka TopicPartitions with a lot of data into 
smaller Spark tasks.
    */
-  private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean = 
{
-    minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+  private def shouldDivvyUpLargePartitions(kafkaOffsetRange: 
Seq[KafkaOffsetRange]): Boolean = {

Review Comment:
   nit: `offsetRanges` instead of `kafkaOffsetRange`? Since this is a Seq



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -271,6 +271,11 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       if (p <= 0) throw new IllegalArgumentException("minPartitions must be 
positive")
     }
 
+    if (params.contains(MAX_RECORDS_PER_PARTITIONS_OPTION_KEY)) {
+      val p = params(MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).toLong
+      if (p <= 0) throw new IllegalArgumentException("maxRecordsPerPartition 
must be positive")

Review Comment:
   nit: use `MAX_RECORDS_PER_PARTITIONS_OPTION_KEY` instead of re-typing it here



##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -34,6 +34,15 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
     }
   }
 
+  def testWithMaxRecordsPerPartitions(name: String, maxRecordsPerPartition: 
Long)
+                           (f: KafkaOffsetRangeCalculator => Unit): Unit = {

Review Comment:
   nit: incorrect format



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -47,13 +48,15 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
     val offsetRanges = ranges.filter(_.size > 0)
 
     // If minPartitions not set or there are enough partitions to satisfy 
minPartitions
-    if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) {
+    // and maxBytesPerPartition is empty
+    if ((minPartitions.isEmpty || offsetRanges.size >= minPartitions.get)
+        && maxRecordsPerPartition.isEmpty) {
       // Assign preferred executor locations to each range such that the same 
topic-partition is
       // preferentially read from the same executor and the KafkaConsumer can 
be reused.
       offsetRanges.map { range =>
         range.copy(preferredLoc = getLocation(range.topicPartition, 
executorLocations))
       }
-    } else {
+    } else if (minPartitions.isDefined && minPartitions.get > 
offsetRanges.size) {

Review Comment:
   This means if `minPartitions` and `maxRecordsPerPartition` are both set by a 
user, the `minPartitions` supersedes it. Hence we might still end up with 
partitions with records greater than `maxRecordsPerPartition`. 
   
   This behavior won't be obvious to users, who may end up setting both and may 
think they work together to make sure we have at least `minPartitions` 
partitions and no partition will have more than `maxRecordsPerPartition` 
records.
   
   One solution is, after doing this minPartitions splitting, we can also call 
the code that ensures that no partition has more than `maxRecordsPerPartition` 
if specified, hence further splitting the partitions.



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -557,6 +562,7 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
   private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+  private[kafka010] val MAX_RECORDS_PER_PARTITIONS_OPTION_KEY = 
"maxRecordsPerPartition"

Review Comment:
   nit: `PER_PARTITION` instead of `PER_PARTITIONS`?



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -26,7 +26,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * Class to calculate offset ranges to process based on the from and until 
offsets, and
  * the configured `minPartitions`.
  */
-private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int]) {
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: 
Option[Int],
+                                                   val maxRecordsPerPartition: 
Option[Long]) {

Review Comment:
   nit: incorrect format



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala:
##########
@@ -98,17 +98,21 @@ private[kafka010] class KafkaOffsetReaderConsumer(
    */
   private val minPartitions =
     
readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+  private val maxRecordsPerPartition =
+    
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).map(_.toLong)
 
-  private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+  private val rangeCalculator =
+    new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
 
   private[kafka010] val offsetFetchAttemptIntervalMs =
     
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, 
"1000").toLong
 
   /**
    * Whether we should divide Kafka TopicPartitions with a lot of data into 
smaller Spark tasks.
    */
-  private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean = 
{
-    minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+  private def shouldDivvyUpLargePartitions(kafkaOffsetRange: 
Seq[KafkaOffsetRange]): Boolean = {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to