[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73653296
  
  [Test build #27178 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull)
 for   PR 4384 at commit 
[`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73653301
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27178/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73647726
  
  [Test build #27178 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull)
 for   PR 4384 at commit 
[`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/4384


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24386248
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,194 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler Function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-09 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24386265
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,194 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler Function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73335178
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26966/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73335167
  
  [Test build #26966 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull)
 for   PR 4384 at commit 
[`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73326747
  
  [Test build #26966 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull)
 for   PR 4384 at commit 
[`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24272277
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73455353
  
  [Test build #27078 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull)
 for   PR 4384 at commit 
[`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73448717
  
@pwendell Could you take a look at the scala docs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73459269
  
  [Test build #27078 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull)
 for   PR 4384 at commit 
[`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `public final class JavaDirectKafkaWordCount `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73459273
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27078/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24267609
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24265028
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
  * Starting and ending offsets are specified in advance,
  * so that you can control exactly-once semantics.
  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
- * configuration parameters/a.
- *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param batch Each KafkaRDDPartition in the batch corresponds to a
- *   range of offsets for a given Kafka topic/partition
+ * configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
--- End diff --

The only reason we were writing not zookeepers is to make the difference 
with the earlier stream clear, for people who want to switch from the old one 
or the new. That applies to the public API. This is internal private API. I can 
add it back, no issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24268021
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
 
 import kafka.common.TopicAndPartition
 
-/** Something that has a collection of OffsetRanges */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents any object that has a collection of [[OffsetRange]]s. This 
can be used access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
--- End diff --

Good call. Let me add the references.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24268035
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24267569
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73207199
  
  [Test build #589 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull)
 for   PR 4384 at commit 
[`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73269084
  
Thanks for adding the java friendly kafka uitls methods.

Your original reason for wanting Array[Leader] rather than 
Map[TopicAndPartition, Broker] was for java compatibility.

But since there are separate scala and java-specific KafkaUtils.createRDD 
methods now, couldn't the scala one take a Map and the java one take a JMap?

As it stands currently, what is the meaning if someone passes in an array 
with multiple Leader objects that have the same topic and partition but 
different broker?

The first thing we do with the array of leaders is convert it to a 
map...seems better to just take a map and avoid both the possibility of 
confusion and the extra conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24252680
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24252952
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -179,121 +182,190 @@ object KafkaUtils {
   errs = throw new SparkException(errs.mkString(\n)),
   ok = ok
 )
-new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
+new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, 
leaders, messageHandler)
   }
 
-  /** A batch-oriented interface for consuming from Kafka.
-   * Starting and ending offsets are specified in advance,
-   * so that you can control exactly-once semantics.
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition. This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and 
access the message as well
+   * as the metadata.
+   *
* @param sc SparkContext object
* @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
-   * configuration parameters/a.
-   *   Requires metadata.broker.list or bootstrap.servers to be set 
with Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
* @param offsetRanges Each OffsetRange in the batch corresponds to a
*   range of offsets for a given Kafka topic/partition
* @param leaders Kafka leaders for each offset range in batch
-   * @param messageHandler function for translating each message into the 
desired type
+   * @param messageHandler function for translating each message and 
metadata into the desired type
*/
   @Experimental
   def createRDD[
 K: ClassTag,
 V: ClassTag,
-U : Decoder[_]: ClassTag,
-T : Decoder[_]: ClassTag,
-R: ClassTag] (
+KD : Decoder[K]: ClassTag,
+VD : Decoder[V]: ClassTag,
+R: ClassTag](
   sc: SparkContext,
   kafkaParams: Map[String, String],
   offsetRanges: Array[OffsetRange],
   leaders: Array[Leader],
   messageHandler: MessageAndMetadata[K, V] = R
-  ): RDD[R] = {
-
+): RDD[R] = {
 val leaderMap = leaders
   .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, 
l.port))
   .toMap
-new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, 
messageHandler)
+new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, 
leaderMap, messageHandler)
   }
 
+
   /**
-   * This stream can guarantee that each message from Kafka is included in 
transformations
-   * (as opposed to output actions) exactly once, even in most failure 
situations.
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
*
-   * Points to note:
-   *
-   * Failure Recovery - You must checkpoint this stream, or save offsets 
yourself and provide them
-   * as the fromOffsets parameter on restart.
-   * Kafka must have sufficient log retention to obtain messages after 
failure.
-   *
-   * Getting offsets from the stream - see programming guide
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
+   *configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers
+   *to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   */
+  @Experimental
+  def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]](
+  jsc: JavaSparkContext,
+  keyClass: Class[K],
+  valueClass: Class[V],
+  keyDecoderClass: Class[KD],
+  valueDecoderClass: Class[VD],
+  kafkaParams: JMap[String, String],
+  offsetRanges: Array[OffsetRange]
+): JavaPairRDD[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+implicit val valueDecoderCmt: ClassTag[VD] = 
ClassTag(valueDecoderClass)
+new JavaPairRDD(createRDD[K, V, KD, VD](
+  jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * :: Experimental ::
+   * Create a RDD from Kafka using offset ranges for each 

[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24253794
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka
 
 import kafka.common.TopicAndPartition
 
-/** Something that has a collection of OffsetRanges */
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Represents any object that has a collection of [[OffsetRange]]s. This 
can be used access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
--- End diff --

It's probably good for the doc for createDirectStream to link to here, in 
addition to the other way around


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-06 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24251761
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala 
---
@@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties
  * Starting and ending offsets are specified in advance,
  * so that you can control exactly-once semantics.
  * @param kafkaParams Kafka a 
href=http://kafka.apache.org/documentation.html#configuration;
- * configuration parameters/a.
- *   Requires metadata.broker.list or bootstrap.servers to be set with 
Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param batch Each KafkaRDDPartition in the batch corresponds to a
- *   range of offsets for a given Kafka topic/partition
+ * configuration parameters/a. Requires metadata.broker.list or 
bootstrap.servers to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
--- End diff --

You removed the not zookeeper servers note here, but left it in for the 
other methods that take kafkaParams


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73180459
  
  [Test build #26888 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull)
 for   PR 4384 at commit 
[`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73180462
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26888/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73182394
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26891/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73182388
  
  [Test build #26891 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull)
 for   PR 4384 at commit 
[`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73178166
  
  [Test build #26891 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull)
 for   PR 4384 at commit 
[`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73195603
  
  [Test build #589 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull)
 for   PR 4384 at commit 
[`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24216559
  
--- Diff: 
examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount brokers topics
+ *   brokers is a list of one or more zookeeper servers that make quorum
--- End diff --

Right. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73164892
  
  [Test build #26878 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull)
 for   PR 4384 at commit 
[`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73170929
  
  [Test build #26878 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull)
 for   PR 4384 at commit 
[`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73170936
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26878/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73175650
  
  [Test build #26888 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull)
 for   PR 4384 at commit 
[`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24142354
  
--- Diff: 
examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount brokers topics
+ *   brokers is a list of one or more zookeeper servers that make quorum
--- End diff --

These are kafka servers, not zookeeper servers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/4384#discussion_r24142682
  
--- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 ---
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, Timeouts}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.util.Utils
+
+class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
--- End diff --

Just renamed the file and testsuite class from KafkaDirectStreamSuite to 
DirectKafkaStreamSuite, but Git/Github considered it to be a move. The first 
unit test is unmodified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-73003057
  
  [Test build #586 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull)
 for   PR 4384 at commit 
[`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-72997883
  
  [Test build #586 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull)
 for   PR 4384 at commit 
[`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-72991452
  
  [Test build #26822 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull)
 for   PR 4384 at commit 
[`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread tdas
Github user tdas commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-72992469
  
@koeninger Can you please take a look. 
@pwendell (Optional) Take a look if you are interested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/4384

[SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream

Changes
- Added example
- Added a critical unit test that verifies that offset ranges can be 
recovered through checkpoints

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark new-kafka-fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/4384.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4384


commit 6a91cab3172701e1c286dbcdadee825a33230913
Author: Tathagata Das tathagata.das1...@gmail.com
Date:   2015-02-05T03:56:46Z

Added example

commit 49867846e72329c849002706441d8165825f4a6b
Author: Tathagata Das tathagata.das1...@gmail.com
Date:   2015-02-05T04:01:17Z

Added unit test to kafka offset recovery




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-72994795
  
  [Test build #26822 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull)
 for   PR 4384 at commit 
[`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...

2015-02-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4384#issuecomment-72994801
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26822/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org