Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/19431#discussion_r166953894
--- Diff:
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1)
-> 10L))
}
+ test("use backpressure.initialRate with backpressure") {
+ val topic = "backpressureInitialRate"
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.backpressure.enabled", "true")
+ .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+ .set("spark.streaming.backpressure.initialRate", "500")
+
+ val messages = Map("foo" -> 5000)
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ new DirectKafkaInputDStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic),
kafkaParams.asScala),
+ new DefaultPerPartitionConfig(sparkConf)
+ )
+ }
+ kafkaStream.start()
+
+ val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+ new TopicPartition(topic, 0) -> 250)) // we run for half a second
+
+ kafkaStream.stop()
+ }
+
+ test("backpressure.initialRate should honor maxRatePerPartition") {
+ val topic = "backpressureInitialRate"
+ val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.backpressure.enabled", "true")
+ .set("spark.streaming.kafka.maxRatePerPartition", "300")
+ .set("spark.streaming.backpressure.initialRate", "1000")
+
+ val messages = Map("foo" -> 5000)
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ new DirectKafkaInputDStream[String, String](
+ ssc,
+ preferredHosts,
+ ConsumerStrategies.Subscribe[String, String](List(topic),
kafkaParams.asScala),
+ new DefaultPerPartitionConfig(sparkConf)
+ )
+ }
+ kafkaStream.start()
+
+ val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --
Same simplification here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]