Github user akonopko commented on a diff in the pull request:
https://github.com/apache/spark/pull/19431#discussion_r167242320
--- Diff:
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1)
-> 10L))
}
+ test("use backpressure.initialRate with backpressure") {
+ val topic = "backpressureInitialRate"
+ val topicPartitions = Set(TopicAndPartition(topic, 0))
+ kafkaTestUtils.createTopic(topic, 1)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.backpressure.enabled", "true")
+ .set("spark.streaming.backpressure.initialRate", "500")
+
+ val messages = Map("foo" -> 5000)
+ kafkaTestUtils.sendMessages(topic, messages)
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ val kc = new KafkaCluster(kafkaParams)
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)
+ val m = kc.getEarliestLeaderOffsets(topicPartitions)
+ .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo
=> lo.offset))
+
+ new DirectKafkaInputDStream[String, String, StringDecoder,
StringDecoder, (String, String)](
+ ssc, kafkaParams, m, messageHandler)
+ }
+ kafkaStream.start()
+
+ val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --
Fixed
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]