Github user JasonMWhite commented on a diff in the pull request: https://github.com/apache/spark/pull/10089#discussion_r46763463 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -89,23 +89,29 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRatePerPartition", 0) - protected def maxMessagesPerPartition: Option[Long] = { + protected def maxMessagesPerPartition(leaderOffsets: Map[TopicAndPartition, LeaderOffset]) + : Option[Map[TopicAndPartition, Long]] = { val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) - val numPartitions = currentOffsets.keys.size - - val effectiveRateLimitPerPartition = estimatedRateLimit - .filter(_ > 0) - .map { limit => - if (maxRateLimitPerPartition > 0) { - Math.min(maxRateLimitPerPartition, (limit / numPartitions)) - } else { - limit / numPartitions + + // calculate a per-partition rate limit based on current lag + val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { + case Some(rate) => + val lagPerPartition = leaderOffsets.map { case (tp, lo) => + tp -> Math.max(lo.offset - currentOffsets(tp), 0) + } + val totalLag = lagPerPartition.values.sum.toFloat + + lagPerPartition.map { case (tp, lag) => + tp -> Math.round(lag / Math.max(totalLag, 1) * rate) --- End diff -- The current test puts all events on a single partition, while the other partition has no events, and therefore no lag. I could add a test for the boundary condition of no lag on any partitions, also one for where the current offset is less than the latest offset (can happen in unclean election situations). Definitely needs to respect `maxRatePerPartition` even when backpressure is working, so that needs some adjustment.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org