Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19431#discussion_r166952263
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
    @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
     
       protected[streaming] def maxMessagesPerPartition(
         offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
    -    val estimatedRateLimit = rateController.map(_.getLatestRate())
    +    val estimatedRateLimit = rateController.map(x => {
    +      val lr = x.getLatestRate()
    +      if (lr > 0) lr else initialRate
    --- End diff --
    
    Thanks for the info. My concern was the `LatestRate = 0` case, where limit 
can be lost. In the meantime taken a look at the `PIDRateEstimator` which could 
not produce 0 rate because of this:
    
    `
            val newRate = (latestRate - proportional * error -
                                        integral * historicalError -
                                        derivative * dError).max(minRate)
    `
    
    and minRate is limited:
    
    `
      require(
        minRate > 0,
        s"Minimum rate in PIDRateEstimator should be > 0")
    `
    
    I'm fine with this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to