[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-03-12 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r173719300
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
--- End diff --

Right, thank you. I will correct this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-03-10 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r173641331
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -456,6 +455,60 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
--- End diff --

Aren't the descriptions of these tests backwards, i.e. this the one testing 
that maxRatePerPartition is honored?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395493
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -22,6 +22,7 @@ import java.lang.{ Long => JLong }
 import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

Ah, they were disabled for test files. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395482
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -539,6 +456,58 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395492
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -21,6 +21,7 @@ import java.io.File
 import java.util.Arrays
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

Ah, they were disabled for test files. Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-10 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167395487
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167281427
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -21,6 +21,7 @@ import java.io.File
 import java.util.Arrays
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

[error] 
/Users/gaborsomogyi/spark_review/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala:24:0:
 java.util.UUID is in wrong order relative to 
java.util.concurrent.atomic.AtomicLong.

Please execute Scalastyle checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167281176
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -22,6 +22,7 @@ import java.lang.{ Long => JLong }
 import java.util.{ Arrays, HashMap => JHashMap, Map => JMap }
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicLong
+import java.util.UUID
--- End diff --

[error] 
/Users/gaborsomogyi/spark_review/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala:25:0:
 java.util.UUID is in wrong order relative to 
java.util.concurrent.atomic.AtomicLong.

Please execute Scalastyle checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167281995
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -539,6 +456,58 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Indentation is wrong. Please check Code Style Guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167281937
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -687,6 +618,51 @@ class DirectKafkaStreamSuite
 ssc.stop()
   }
 
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+backpressureTest(maxRatePerPartition = 1000, initialRate = 500, 
maxMessagesPerPartition = 250)
+  }
+
+  test("use backpressure.initialRate with backpressure") {
+backpressureTest(maxRatePerPartition = 300, initialRate = 1000, 
maxMessagesPerPartition = 150)
+  }
+
+  private def backpressureTest(maxRatePerPartition: Int,
+   initialRate: Int,
--- End diff --

Indentation is wrong. Please check Code Style Guide.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242320
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242353
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242329
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicAndPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate2"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-09 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r167242186
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166952412
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Same applies here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166953420
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Just caught this could be a bit simplified:

> assert(kafkaStream.maxMessagesPerPartition(input).get ==
>  Map(new TopicPartition(topic, 0) -> 250)) // we run for half a second



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166953742
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicAndPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate2"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Same simplification here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166953894
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 ---
@@ -551,6 +551,76 @@ class DirectKafkaStreamSuite
   Map(new TopicPartition(topic, 0) -> 5L, new TopicPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "1000")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
+  new TopicPartition(topic, 0) -> 250)) // we run for half a second
+
+kafkaStream.stop()
+  }
+
+  test("backpressure.initialRate should honor maxRatePerPartition") {
+val topic = "backpressureInitialRate"
+val kafkaParams = getKafkaParams("auto.offset.reset" -> "earliest")
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.kafka.maxRatePerPartition", "300")
+  .set("spark.streaming.backpressure.initialRate", "1000")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  new DirectKafkaInputDStream[String, String](
+ssc,
+preferredHosts,
+ConsumerStrategies.Subscribe[String, String](List(topic), 
kafkaParams.asScala),
+new DefaultPerPartitionConfig(sparkConf)
+  )
+}
+kafkaStream.start()
+
+val input = Map(new TopicPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Same simplification here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166953800
  
--- Diff: 
external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -387,6 +387,89 @@ class DirectKafkaStreamSuite
   Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) 
-> 10L))
   }
 
+  test("use backpressure.initialRate with backpressure") {
+val topic = "backpressureInitialRate"
+val topicPartitions = Set(TopicAndPartition(topic, 0))
+kafkaTestUtils.createTopic(topic, 1)
+val kafkaParams = Map(
+  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+  "auto.offset.reset" -> "smallest"
+)
+
+val sparkConf = new SparkConf()
+  // Safe, even with streaming, because we're using the direct API.
+  // Using 1 core is useful to make the test more predictable.
+  .setMaster("local[1]")
+  .setAppName(this.getClass.getSimpleName)
+  .set("spark.streaming.backpressure.enabled", "true")
+  .set("spark.streaming.backpressure.initialRate", "500")
+
+val messages = Map("foo" -> 5000)
+kafkaTestUtils.sendMessages(topic, messages)
+
+ssc = new StreamingContext(sparkConf, Milliseconds(500))
+
+val kafkaStream = withClue("Error creating direct stream") {
+  val kc = new KafkaCluster(kafkaParams)
+  val messageHandler = (mmd: MessageAndMetadata[String, String]) => 
(mmd.key, mmd.message)
+  val m = kc.getEarliestLeaderOffsets(topicPartitions)
+.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo 
=> lo.offset))
+
+  new DirectKafkaInputDStream[String, String, StringDecoder, 
StringDecoder, (String, String)](
+ssc, kafkaParams, m, messageHandler)
+}
+kafkaStream.start()
+
+val input = Map(new TopicAndPartition(topic, 0) -> 1000L)
+
+
assert(kafkaStream.maxMessagesPerPartition(input).flatMap(_.headOption).contains(
--- End diff --

Same simplification here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166952263
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Thanks for the info. My concern was the `LatestRate = 0` case, where limit 
can be lost. In the meantime taken a look at the `PIDRateEstimator` which could 
not produce 0 rate because of this:

`
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
`

and minRate is limited:

`
  require(
minRate > 0,
s"Minimum rate in PIDRateEstimator should be > 0")
`

I'm fine with this.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166606906
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

If somehow cluster was so heavily loaded with other processes that could 
process 0 events in Spark Streaming, this means that we might have huge backlog 
after that. Which mean without this fix system has big chance of overflowing


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605640
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible.

This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:

This is the initial maximum receiving rate at which each receiver will 
receive data for the first batch when the backpressure mechanism is enabled.

If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605671
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -108,7 +115,9 @@ class DirectKafkaInputDStream[
   tp -> (if (maxRateLimitPerPartition > 0) {
 Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
 }
-  case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+  case None => offsets.map { case (tp, offset) => tp -> {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605578
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166605547
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Latest rate means rate of previous batch. Is it possible that in alive 
system 0 events were processed? Only if there is no backlog and no new events 
came during last batch. Completely possible. 

This happens during first ran. And this parameter should limit it during 
1st ran. Quote from docs:

`This is the initial maximum receiving rate at which each receiver will 
receive data for the
first batch when the backpressure mechanism is enabled.`

If it happened during system run, for example there is no backlog and no 
new events came, we still need to limit system rate since with LatestRate = 0 
it results in no limit, causing danger of overflowing the system. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2018-02-07 Thread akonopko
Github user akonopko commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r166603416
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2017-12-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r158375331
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -108,7 +115,9 @@ class DirectKafkaInputDStream[
   tp -> (if (maxRateLimitPerPartition > 0) {
 Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
 }
-  case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+  case None => offsets.map { case (tp, offset) => tp -> {
--- End diff --

What is the intention here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2017-12-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r158374124
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

style: `val estimatedRateLimit = rateController.map { x => {`

Please take a look at https://spark.apache.org/contributing.html



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2017-12-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r158375540
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
--- End diff --

style: `val estimatedRateLimit = rateController.map { x => {`

Please take a look at https://spark.apache.org/contributing.html



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2017-12-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r158382365
  
--- Diff: 
external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -91,9 +91,16 @@ class DirectKafkaInputDStream[
   private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
   "spark.streaming.kafka.maxRatePerPartition", 0)
 
+  private val initialRate = context.sparkContext.getConf.getLong(
+"spark.streaming.backpressure.initialRate", 0)
+
   protected[streaming] def maxMessagesPerPartition(
   offsets: Map[TopicAndPartition, Long]): 
Option[Map[TopicAndPartition, Long]] = {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Is it possible that the server is so heavily loaded that current rate limit 
drops to 0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19431: [SPARK-18580] [DStreams] [external/kafka-0-10][ex...

2017-12-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/19431#discussion_r158382300
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -126,7 +129,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
 
   protected[streaming] def maxMessagesPerPartition(
 offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] 
= {
-val estimatedRateLimit = rateController.map(_.getLatestRate())
+val estimatedRateLimit = rateController.map(x => {
+  val lr = x.getLatestRate()
+  if (lr > 0) lr else initialRate
--- End diff --

Is it possible that the server is so heavily loaded that current rate limit 
drops to 0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org