Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166953420 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { + val topic = "backpressureInitialRate" + val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") + val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + + val messages = Map("foo" -> 5000) + kafkaTestUtils.sendMessages(topic, messages) + + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + + val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( + ssc, + preferredHosts, + ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), + new DefaultPerPartitionConfig(sparkConf) + ) + } + kafkaStream.start() + + val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Just caught this could be a bit simplified: > assert(kafkaStream.maxMessagesPerPartition(input).get == > Map(new TopicPartition(topic, 0) -> 250)) // we run for half a second
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org