[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15132


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-07 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86810750
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set 
via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+abstract class PerPartitionConfig() extends Serializable {
+  /**
+   *  Maximum rate (number of records per second) at which data will be 
read
+   *  from each Kafka partition.
+   */
+  def maxRatePerPartition(topicPartition: TopicPartition): Int
--- End diff --

That's a reasonable question.  2 billion messages per second on a single 
thread would be pretty extreme, but I don't think it hurts us to change this to 
a long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86659680
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set 
via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+abstract class PerPartitionConfig() extends Serializable {
+  /**
+   *  Maximum rate (number of records per second) at which data will be 
read
+   *  from each Kafka partition.
+   */
+  def maxRatePerPartition(topicPartition: TopicPartition): Int
--- End diff --

dumb question - can this ever overflow?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86659642
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 ---
@@ -123,7 +123,31 @@ object KafkaUtils extends Logging {
   locationStrategy: LocationStrategy,
   consumerStrategy: ConsumerStrategy[K, V]
 ): InputDStream[ConsumerRecord[K, V]] = {
-new DirectKafkaInputDStream[K, V](ssc, locationStrategy, 
consumerStrategy)
+val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
+createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
+  }
+
+/**
--- End diff --

indentation is off here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-05 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86659652
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set 
via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+abstract class PerPartitionConfig() extends Serializable {
--- End diff --

you don't need to () here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-11-04 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15132#discussion_r86643388
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set 
via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+trait PerPartitionConfig extends Serializable {
--- End diff --

if you want to avoid breaking apis, i'd make this an abstract class rather 
than a trait


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15132: [SPARK-17510][STREAMING][KAFKA] config max rate o...

2016-09-17 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/15132

[SPARK-17510][STREAMING][KAFKA] config max rate on a per-partition basis

## What changes were proposed in this pull request?

Allow configuration of max rate on a per-topicpartition basis.

## How was this patch tested?

Unit tests.

The reporter (Jeff Nadler) said he could test on his workload, so let's 
wait on that report.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-17510

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15132


commit b282fe1ba1245170e426b62fe7c543b2a26a6488
Author: cody koeninger 
Date:   2016-09-17T16:32:41Z

[SPARK-17510][STREAMING][KAFKA] allow max rate on a per-partition basis

commit 9fa1a4f8c8d1027b9c39d087299eeac1ffa11348
Author: cody koeninger 
Date:   2016-09-17T16:45:58Z

[SPARK-17510][STREAMING][KAFKA] test max rate on a per-partition basis




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org