micheal-o commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1779355320
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -24,19 +24,25 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* Class to calculate offset ranges to process based on the from and until
offsets, and
- * the configured `minPartitions`.
+ * the configured `minPartitions` and `maxRecordsPerPartition`.
*/
-private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions:
Option[Int]) {
+private[kafka010] class KafkaOffsetRangeCalculator(
+ val minPartitions: Option[Int],
+ val maxRecordsPerPartition: Option[Long]) {
require(minPartitions.isEmpty || minPartitions.get > 0)
+ require(maxRecordsPerPartition.isEmpty || maxRecordsPerPartition.get > 0)
/**
* Calculate the offset ranges that we are going to process this batch. If
`minPartitions`
* is not set or is set less than or equal the number of `topicPartitions`
that we're going to
- * consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka
partitions. If
+ * consume and, `maxRecordsPerPartition` is not set then we fall back to a
1-1 mapping of Spark
+ * tasks to Kafka partitions. If `maxRecordsPerPartition` is set, then we
will split up read task
+ * to multiple tasks as per `maxRecordsPerPartition` value. If
* `minPartitions` is set higher than the number of our `topicPartitions`,
then we will split up
* the read tasks of the skewed partitions to multiple Spark tasks.
- * The number of Spark tasks will be *approximately* `minPartitions`. It can
be less or more
- * depending on rounding errors or Kafka partitions that didn't receive any
new data.
+ * The number of Spark tasks will be *approximately* max of
`maxRecordsPerPartition`
+ * and `minPartitions`. It can be less or more depending on rounding errors
or Kafka partitions
Review Comment:
>The number of Spark tasks will be *approximately* max of
`maxRecordsPerPartition`
and `minPartitions`.
This statement is incorrect right?
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -253,6 +274,75 @@ class KafkaOffsetRangeCalculatorSuite extends
SparkFunSuite {
KafkaOffsetRange(tp3, 7500, 10000, None)))
}
+ testWithMaxRecordsPerPartition("SPARK-49259: 1 TopicPartition to N offset
ranges", 4) { calc =>
+ assert(
+ calc.getRanges(
+ Seq(KafkaOffsetRange(tp1, 1, 5))) ==
+ Seq(
+ KafkaOffsetRange(tp1, 1, 5, None)))
+
+ assert(
+ calc.getRanges(
+ Seq(KafkaOffsetRange(tp1, 1, 2))) ==
+ Seq(
+ KafkaOffsetRange(tp1, 1, 2, None)))
+
+ assert(
+ calc.getRanges(
+ Seq(KafkaOffsetRange(tp1, 1, 6)),
+ executorLocations = Seq("location")) ==
+ Seq(
+ KafkaOffsetRange(tp1, 1, 3, None),
+ KafkaOffsetRange(tp1, 3, 6, None))) // location pref not set when
minPartition is set
Review Comment:
nit: fix comment
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -34,6 +34,27 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
}
}
+ def testWithMaxRecordsPerPartition(name: String, maxRecordsPerPartition:
Long)
+ (f: KafkaOffsetRangeCalculator => Unit): Unit = {
+ val options = new CaseInsensitiveStringMap(
+ Map("maxRecordsPerPartition" -> maxRecordsPerPartition.toString).asJava)
+ test(s"with maxRecordsPerPartition = $maxRecordsPerPartition: $name") {
+ f(KafkaOffsetRangeCalculator(options))
+ }
+ }
+
+ def testWithMinPartitionsAndMaxRecordsPerPartition(name: String,
Review Comment:
nit: `name: String,` should be on a new line
See how to run
[scalafmt](https://spark.apache.org/contributing.html#:~:text=resolve%20the%20JIRA.-,Code%20style%20guide,-Please%20follow%20the)
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -557,6 +565,7 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+ private[kafka010] val MAX_RECORDS_PER_PARTITION_OPTION_KEY =
"maxRecordsPerPartition"
Review Comment:
nit: should we use `maxrecordsperpartition` instead, to be consistent with
others?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]