Github user JasonMWhite commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10089#discussion_r46763463
  
    --- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
    @@ -89,23 +89,29 @@ class DirectKafkaInputDStream[
     
       private val maxRateLimitPerPartition: Int = 
context.sparkContext.getConf.getInt(
           "spark.streaming.kafka.maxRatePerPartition", 0)
    -  protected def maxMessagesPerPartition: Option[Long] = {
    +  protected def maxMessagesPerPartition(leaderOffsets: 
Map[TopicAndPartition, LeaderOffset])
    +    : Option[Map[TopicAndPartition, Long]] = {
         val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
    -    val numPartitions = currentOffsets.keys.size
    -
    -    val effectiveRateLimitPerPartition = estimatedRateLimit
    -      .filter(_ > 0)
    -      .map { limit =>
    -        if (maxRateLimitPerPartition > 0) {
    -          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
    -        } else {
    -          limit / numPartitions
    +
    +    // calculate a per-partition rate limit based on current lag
    +    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
    +      case Some(rate) =>
    +        val lagPerPartition = leaderOffsets.map { case (tp, lo) =>
    +          tp -> Math.max(lo.offset - currentOffsets(tp), 0)
    +        }
    +        val totalLag = lagPerPartition.values.sum.toFloat
    +
    +        lagPerPartition.map { case (tp, lag) =>
    +          tp -> Math.round(lag / Math.max(totalLag, 1) * rate)
    --- End diff --
    
    The current test puts all events on a single partition, while the other 
partition has no events, and therefore no lag. I could add a test for the 
boundary condition of no lag on any partitions, also one for where the current 
offset is less than the latest offset (can happen in unclean election 
situations).
    
    Definitely needs to respect `maxRatePerPartition` even when backpressure is 
working, so that needs some adjustment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to