[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

2018-08-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

2018-08-29 Thread rezasafi
Github user rezasafi commented on a diff in the pull request:

https://github.com/apache/spark/pull/3#discussion_r213829668
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
 if (effectiveRateLimitPerPartition.values.sum > 0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
   Some(effectiveRateLimitPerPartition.map {
-case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 
1L)
+case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
+  Math.max(ppc.minRatePerPartition(tp), 1L))
--- End diff --

I just didn't want to break the reasoning behind SPARK-18371 to have at 
least 1 always. I didn't have any other reason for this. I can change it to 
give the user the freedom.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22223: [SPARK-25233][Streaming] Give the user the option...

2018-08-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/3#discussion_r213825892
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 ---
@@ -154,7 +153,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
 if (effectiveRateLimitPerPartition.values.sum > 0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble 
/ 1000
   Some(effectiveRateLimitPerPartition.map {
-case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 
1L)
+case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong,
+  Math.max(ppc.minRatePerPartition(tp), 1L))
--- End diff --

Is the second Math.max actually necessary?
The default implementation of minRatePerPartition will be 1 anyway.
If someone makes a custom implementation that e.g. returns zero, should 
they get what they asked for?.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org