noslowerdna opened a new pull request #33827:
URL: https://github.com/apache/spark/pull/33827


   …itions option
   
   ### What changes were proposed in this pull request?
   
   Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic be 
modified to exclude small (i.e. un-split) partitions from the overall 
proportional distribution math, in order to more reasonably divide the large 
partitions when they are accompanied by many small partitions, and to provide 
optimal behavior for cases where a `minPartitions` value is deliberately 
computed based on the volume of data being read.
   
   ### Why are the changes needed?
   
   While the 
[documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
 does contain a clear disclaimer,
   
   > Please note that this configuration is like a hint: the number of Spark 
tasks will be **approximately** `minPartitions`. It can be less or more 
depending on rounding errors or Kafka partitions that didn't receive any new 
data.
   
   there are cases where the calculated Kafka partition range splits can differ 
greatly from expectations. For evenly distributed data and most `minPartitions 
`values this would not be a major or commonly encountered concern. However when 
the distribution of data across partitions is very heavily skewed, somewhat 
surprising range split calculations can result.
   
   For example, given the following input data:
   
   - 1 partition containing 10,000 messages
   - 1,000 partitions each containing 1 message
   
   Spark processing code loading from this collection of 1,001 partitions may 
decide that it would like each task to read no more than 1,000 messages. 
Consequently, it could specify a `minPartitions` value of 1,010 — expecting the 
single large partition to be split into 10 equal chunks, along with the 1,000 
small partitions each having their own task. That is far from what actually 
occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large 
partition into 918 chunks of 10 or 11 messages, two orders of magnitude from 
the desired maximum message count per task and nearly double the number of 
Spark tasks hinted in the configuration.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing unit tests and added new unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to