Github user akonopko commented on a diff in the pull request:
https://github.com/apache/spark/pull/19431#discussion_r167395487
--- Diff:
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
---
@@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
ssc.stop()
}
+ test("backpressure.initialRate should honor maxRatePerPartition") {
+ backpressureTest(maxRatePerPartition = 1000, initialRate = 500,
maxMessagesPerPartition = 250)
+ }
+
+ test("use backpressure.initialRate with backpressure") {
+ backpressureTest(maxRatePerPartition = 300, initialRate = 1000,
maxMessagesPerPartition = 150)
+ }
+
+ private def backpressureTest(maxRatePerPartition: Int,
+ initialRate: Int,
--- End diff --
Fixed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]