[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r173719300 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { --- End diff -- Right, thank you. I will correct this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r173641331 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -456,6 +455,60 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { --- End diff -- Aren't the descriptions of these tests backwards, i.e. this the one testing that maxRatePerPartition is honored? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395493 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -22,6 +22,7 @@ import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- Ah, they were disabled for test files. Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395482 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -539,6 +456,58 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395492 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -21,6 +21,7 @@ import java.io.File import java.util.Arrays import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- Ah, they were disabled for test files. Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167395487 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -687,6 +618,51 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167281427 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -21,6 +21,7 @@ import java.io.File import java.util.Arrays import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- [error] /Users/gaborsomogyi/spark_review/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:24:0: java.util.UUID is in wrong order relative to java.util.concurrent.atomic.AtomicLong. Please execute Scalastyle checks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167281176 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -22,6 +22,7 @@ import java.lang.{ Long => JLong } import java.util.{ Arrays, HashMap => JHashMap, Map => JMap } import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.UUID --- End diff -- [error] /Users/gaborsomogyi/spark_review/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala:25:0: java.util.UUID is in wrong order relative to java.util.concurrent.atomic.AtomicLong. Please execute Scalastyle checks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167281995 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -539,6 +456,58 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Indentation is wrong. Please check Code Style Guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167281937 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -687,6 +618,51 @@ class DirectKafkaStreamSuite ssc.stop() } + test("backpressure.initialRate should honor maxRatePerPartition") { +backpressureTest(maxRatePerPartition = 1000, initialRate = 500, maxMessagesPerPartition = 250) + } + + test("use backpressure.initialRate with backpressure") { +backpressureTest(maxRatePerPartition = 300, initialRate = 1000, maxMessagesPerPartition = 150) + } + + private def backpressureTest(maxRatePerPartition: Int, + initialRate: Int, --- End diff -- Indentation is wrong. Please check Code Style Guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242320 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242353 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242329 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicAndPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate2" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r167242186 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166952412 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Same applies here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166953420 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Just caught this could be a bit simplified: > assert(kafkaStream.maxMessagesPerPartition(input).get == > Map(new TopicPartition(topic, 0) -> 250)) // we run for half a second --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166953742 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicAndPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate2" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Same simplification here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166953894 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala --- @@ -551,6 +551,76 @@ class DirectKafkaStreamSuite Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "1000") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( + new TopicPartition(topic, 0) -> 250)) // we run for half a second + +kafkaStream.stop() + } + + test("backpressure.initialRate should honor maxRatePerPartition") { +val topic = "backpressureInitialRate" +val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest") +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.kafka.maxRatePerPartition", "300") + .set("spark.streaming.backpressure.initialRate", "1000") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + new DirectKafkaInputDStream[String, String]( +ssc, +preferredHosts, +ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), +new DefaultPerPartitionConfig(sparkConf) + ) +} +kafkaStream.start() + +val input = Map(new TopicPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Same simplification here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166953800 --- Diff: external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -387,6 +387,89 @@ class DirectKafkaStreamSuite Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L)) } + test("use backpressure.initialRate with backpressure") { +val topic = "backpressureInitialRate" +val topicPartitions = Set(TopicAndPartition(topic, 0)) +kafkaTestUtils.createTopic(topic, 1) +val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" +) + +val sparkConf = new SparkConf() + // Safe, even with streaming, because we're using the direct API. + // Using 1 core is useful to make the test more predictable. + .setMaster("local[1]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.backpressure.enabled", "true") + .set("spark.streaming.backpressure.initialRate", "500") + +val messages = Map("foo" -> 5000) +kafkaTestUtils.sendMessages(topic, messages) + +ssc = new StreamingContext(sparkConf, Milliseconds(500)) + +val kafkaStream = withClue("Error creating direct stream") { + val kc = new KafkaCluster(kafkaParams) + val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) + val m = kc.getEarliestLeaderOffsets(topicPartitions) +.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) + + new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( +ssc, kafkaParams, m, messageHandler) +} +kafkaStream.start() + +val input = Map(new TopicAndPartition(topic, 0) -> 1000L) + + assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains( --- End diff -- Same simplification here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166952263 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Thanks for the info. My concern was the `LatestRate = 0` case, where limit can be lost. In the meantime taken a look at the `PIDRateEstimator` which could not produce 0 rate because of this: ` val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate) ` and minRate is limited: ` require( minRate > 0, s"Minimum rate in PIDRateEstimator should be > 0") ` I'm fine with this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166606906 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- If somehow cluster was so heavily loaded with other processes that could process 0 events in Spark Streaming, this means that we might have huge backlog after that. Which mean without this fix system has big chance of overflowing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605640 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs: This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled. If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605671 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -108,7 +115,9 @@ class DirectKafkaInputDStream[ tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + case None => offsets.map { case (tp, offset) => tp -> { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605578 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166605547 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Latest rate means rate of previous batch. Is it possible that in alive system 0 events were processed? Only if there is no backlog and no new events came during last batch. Completely possible. This happens during first ran. And this parameter should limit it during 1st ran. Quote from docs: `This is the initial maximum receiving rate at which each receiver will receive data for the first batch when the backpressure mechanism is enabled.` If it happened during system run, for example there is no backlog and no new events came, we still need to limit system rate since with LatestRate = 0 it results in no limit, causing danger of overflowing the system. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user akonopko commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r166603416 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r158375331 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -108,7 +115,9 @@ class DirectKafkaInputDStream[ tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + case None => offsets.map { case (tp, offset) => tp -> { --- End diff -- What is the intention here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r158374124 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { --- End diff -- style: `val estimatedRateLimit = rateController.map { x => {` Please take a look at https://spark.apache.org/contributing.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r158375540 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { --- End diff -- style: `val estimatedRateLimit = rateController.map { x => {` Please take a look at https://spark.apache.org/contributing.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r158382365 --- Diff: external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -91,9 +91,16 @@ class DirectKafkaInputDStream[ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) + private val initialRate = context.sparkContext.getConf.getLong( +"spark.streaming.backpressure.initialRate", 0) + protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) + +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Is it possible that the server is so heavily loaded that current rate limit drops to 0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/19431#discussion_r158382300 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { -val estimatedRateLimit = rateController.map(_.getLatestRate()) +val estimatedRateLimit = rateController.map(x => { + val lr = x.getLatestRate() + if (lr > 0) lr else initialRate --- End diff -- Is it possible that the server is so heavily loaded that current rate limit drops to 0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org