micheal-o commented on code in PR #47927:
URL: https://github.com/apache/spark/pull/47927#discussion_r1776366259
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -80,18 +83,35 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
} else {
getPartCount(size, splitRangeTotalSize, splitRangeMinPartitions)
}
- var remaining = size
- var startOffset = range.fromOffset
- (0 until parts).map { part =>
- // Fine to do integer division. Last partition will consume all the
round off errors
- val thisPartition = remaining / (parts - part)
- remaining -= thisPartition
- val endOffset = math.min(startOffset + thisPartition,
range.untilOffset)
- val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, None)
- startOffset = endOffset
- offsetRange
- }
+ getDividedPartition(parts, range)
}.filter(_.size > 0)
+ } else {
+ val maxRecords = maxRecordsPerPartition.get
+
+ offsetRanges.flatMap { range =>
+ val size = range.size
+ // number of partitions to divvy up this topic partition to
+ val parts = math.ceil(size.toDouble / maxRecords).toInt
+ getDividedPartition(parts, range)
+ }.filter(_.size > 0)
+ }
+ }
+
+ private def getDividedPartition(parts: Int, offsetRange: KafkaOffsetRange)
+ : IndexedSeq[KafkaOffsetRange] = {
Review Comment:
nit: incorrect format
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala:
##########
@@ -99,14 +99,18 @@ private[kafka010] class KafkaOffsetReaderAdmin(
*/
private val minPartitions =
readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+ private val maxRecordsPerPartition =
+
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).map(_.toLong)
- private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+ private val rangeCalculator =
+ new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
/**
* Whether we should divide Kafka TopicPartitions with a lot of data into
smaller Spark tasks.
*/
- private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean =
{
- minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+ private def shouldDivvyUpLargePartitions(kafkaOffsetRange:
Seq[KafkaOffsetRange]): Boolean = {
Review Comment:
nit: `offsetRanges` instead of `kafkaOffsetRange`? Since this is a Seq
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -271,6 +271,11 @@ private[kafka010] class KafkaSourceProvider extends
DataSourceRegister
if (p <= 0) throw new IllegalArgumentException("minPartitions must be
positive")
}
+ if (params.contains(MAX_RECORDS_PER_PARTITIONS_OPTION_KEY)) {
+ val p = params(MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).toLong
+ if (p <= 0) throw new IllegalArgumentException("maxRecordsPerPartition
must be positive")
Review Comment:
nit: use `MAX_RECORDS_PER_PARTITIONS_OPTION_KEY` instead of re-typing it here
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala:
##########
@@ -34,6 +34,15 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
}
}
+ def testWithMaxRecordsPerPartitions(name: String, maxRecordsPerPartition:
Long)
+ (f: KafkaOffsetRangeCalculator => Unit): Unit = {
Review Comment:
nit: incorrect format
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -47,13 +48,15 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
val offsetRanges = ranges.filter(_.size > 0)
// If minPartitions not set or there are enough partitions to satisfy
minPartitions
- if (minPartitions.isEmpty || offsetRanges.size >= minPartitions.get) {
+ // and maxBytesPerPartition is empty
+ if ((minPartitions.isEmpty || offsetRanges.size >= minPartitions.get)
+ && maxRecordsPerPartition.isEmpty) {
// Assign preferred executor locations to each range such that the same
topic-partition is
// preferentially read from the same executor and the KafkaConsumer can
be reused.
offsetRanges.map { range =>
range.copy(preferredLoc = getLocation(range.topicPartition,
executorLocations))
}
- } else {
+ } else if (minPartitions.isDefined && minPartitions.get >
offsetRanges.size) {
Review Comment:
This means if `minPartitions` and `maxRecordsPerPartition` are both set by a
user, the `minPartitions` supersedes it. Hence we might still end up with
partitions with records greater than `maxRecordsPerPartition`.
This behavior won't be obvious to users, who may end up setting both and may
think they work together to make sure we have at least `minPartitions`
partitions and no partition will have more than `maxRecordsPerPartition`
records.
One solution is, after doing this minPartitions splitting, we can also call
the code that ensures that no partition has more than `maxRecordsPerPartition`
if specified, hence further splitting the partitions.
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala:
##########
@@ -557,6 +562,7 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
private[kafka010] val ENDING_TIMESTAMP_OPTION_KEY = "endingtimestamp"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+ private[kafka010] val MAX_RECORDS_PER_PARTITIONS_OPTION_KEY =
"maxRecordsPerPartition"
Review Comment:
nit: `PER_PARTITION` instead of `PER_PARTITIONS`?
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala:
##########
@@ -26,7 +26,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* Class to calculate offset ranges to process based on the from and until
offsets, and
* the configured `minPartitions`.
*/
-private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions:
Option[Int]) {
+private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions:
Option[Int],
+ val maxRecordsPerPartition:
Option[Long]) {
Review Comment:
nit: incorrect format
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala:
##########
@@ -98,17 +98,21 @@ private[kafka010] class KafkaOffsetReaderConsumer(
*/
private val minPartitions =
readerOptions.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY).map(_.toInt)
+ private val maxRecordsPerPartition =
+
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITIONS_OPTION_KEY).map(_.toLong)
- private val rangeCalculator = new KafkaOffsetRangeCalculator(minPartitions)
+ private val rangeCalculator =
+ new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
private[kafka010] val offsetFetchAttemptIntervalMs =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS,
"1000").toLong
/**
* Whether we should divide Kafka TopicPartitions with a lot of data into
smaller Spark tasks.
*/
- private def shouldDivvyUpLargePartitions(numTopicPartitions: Int): Boolean =
{
- minPartitions.map(_ > numTopicPartitions).getOrElse(false)
+ private def shouldDivvyUpLargePartitions(kafkaOffsetRange:
Seq[KafkaOffsetRange]): Boolean = {
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]