[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73653296 [Test build #27178 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull) for PR 4384 at commit [`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73653301 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27178/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73647726 [Test build #27178 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27178/consoleFull) for PR 4384 at commit [`7c931c3`](https://github.com/apache/spark/commit/7c931c3fd174376fc04a436a64ec414dbe8eac46). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4384 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24386248 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,194 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24386265 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,194 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler Function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73335178 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26966/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73335167 [Test build #26966 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull) for PR 4384 at commit [`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73326747 [Test build #26966 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26966/consoleFull) for PR 4384 at commit [`26df23c`](https://github.com/apache/spark/commit/26df23c5578e52dc594557cfaf2170b1e0d49169). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24272277 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73455353 [Test build #27078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull) for PR 4384 at commit [`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73448717 @pwendell Could you take a look at the scala docs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73459269 [Test build #27078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27078/consoleFull) for PR 4384 at commit [`83d0402`](https://github.com/apache/spark/commit/83d04025da1cac3d1ec8565015dbe492f17c3b79). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public final class JavaDirectKafkaWordCount ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73459273 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27078/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24267609 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24265028 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param batch Each KafkaRDDPartition in the batch corresponds to a - * range of offsets for a given Kafka topic/partition + * configuration parameters/a. Requires metadata.broker.list or bootstrap.servers to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. --- End diff -- The only reason we were writing not zookeepers is to make the difference with the earlier stream clear, for people who want to switch from the old one or the new. That applies to the public API. This is internal private API. I can add it back, no issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24268021 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Something that has a collection of OffsetRanges */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). --- End diff -- Good call. Let me add the references. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24268035 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24267569 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73207199 [Test build #589 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull) for PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73269084 Thanks for adding the java friendly kafka uitls methods. Your original reason for wanting Array[Leader] rather than Map[TopicAndPartition, Broker] was for java compatibility. But since there are separate scala and java-specific KafkaUtils.createRDD methods now, couldn't the scala one take a Map and the java one take a JMap? As it stands currently, what is the meaning if someone passes in an array with multiple Leader objects that have the same topic and partition but different broker? The first thing we do with the array of leaders is convert it to a map...seems better to just take a map and avoid both the possibility of confusion and the extra conversion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24252680 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24252952 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -179,121 +182,190 @@ object KafkaUtils { errs = throw new SparkException(errs.mkString(\n)), ok = ok ) -new KafkaRDD[K, V, U, T, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) +new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) } - /** A batch-oriented interface for consuming from Kafka. - * Starting and ending offsets are specified in advance, - * so that you can control exactly-once semantics. + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you + * specify the Kafka leader to connect to (to optimize fetching) and access the message as well + * as the metadata. + * * @param sc SparkContext object * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. * @param offsetRanges Each OffsetRange in the batch corresponds to a * range of offsets for a given Kafka topic/partition * @param leaders Kafka leaders for each offset range in batch - * @param messageHandler function for translating each message into the desired type + * @param messageHandler function for translating each message and metadata into the desired type */ @Experimental def createRDD[ K: ClassTag, V: ClassTag, -U : Decoder[_]: ClassTag, -T : Decoder[_]: ClassTag, -R: ClassTag] ( +KD : Decoder[K]: ClassTag, +VD : Decoder[V]: ClassTag, +R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange], leaders: Array[Leader], messageHandler: MessageAndMetadata[K, V] = R - ): RDD[R] = { - +): RDD[R] = { val leaderMap = leaders .map(l = TopicAndPartition(l.topic, l.partition) - (l.host, l.port)) .toMap -new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) +new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) } + /** - * This stream can guarantee that each message from Kafka is included in transformations - * (as opposed to output actions) exactly once, even in most failure situations. + * Create a RDD from Kafka using offset ranges for each topic and partition. * - * Points to note: - * - * Failure Recovery - You must checkpoint this stream, or save offsets yourself and provide them - * as the fromOffsets parameter on restart. - * Kafka must have sufficient log retention to obtain messages after failure. - * - * Getting offsets from the stream - see programming guide + * @param jsc JavaSparkContext object + * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; + *configuration parameters/a. Requires metadata.broker.list or bootstrap.servers + *to be set with Kafka broker(s) (NOT zookeeper servers) specified in + *host1:port1,host2:port2 form. + * @param offsetRanges Each OffsetRange in the batch corresponds to a + * range of offsets for a given Kafka topic/partition + */ + @Experimental + def createRDD[K, V, KD : Decoder[K], VD : Decoder[V]]( + jsc: JavaSparkContext, + keyClass: Class[K], + valueClass: Class[V], + keyDecoderClass: Class[KD], + valueDecoderClass: Class[VD], + kafkaParams: JMap[String, String], + offsetRanges: Array[OffsetRange] +): JavaPairRDD[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) +implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) +new JavaPairRDD(createRDD[K, V, KD, VD]( + jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + } + + /** + * :: Experimental :: + * Create a RDD from Kafka using offset ranges for each
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24253794 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -19,16 +19,35 @@ package org.apache.spark.streaming.kafka import kafka.common.TopicAndPartition -/** Something that has a collection of OffsetRanges */ +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents any object that has a collection of [[OffsetRange]]s. This can be used access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). --- End diff -- It's probably good for the doc for createDirectStream to link to here, in addition to the other way around --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24251761 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -36,14 +36,12 @@ import kafka.utils.VerifiableProperties * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka a href=http://kafka.apache.org/documentation.html#configuration; - * configuration parameters/a. - * Requires metadata.broker.list or bootstrap.servers to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param batch Each KafkaRDDPartition in the batch corresponds to a - * range of offsets for a given Kafka topic/partition + * configuration parameters/a. Requires metadata.broker.list or bootstrap.servers to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. --- End diff -- You removed the not zookeeper servers note here, but left it in for the other methods that take kafkaParams --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73180459 [Test build #26888 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull) for PR 4384 at commit [`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73180462 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26888/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73182394 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26891/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73182388 [Test build #26891 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull) for PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73178166 [Test build #26891 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26891/consoleFull) for PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73195603 [Test build #589 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/589/consoleFull) for PR 4384 at commit [`e4abf69`](https://github.com/apache/spark/commit/e4abf69b63bb6bfa94823bcefd27bcbe821b1f2e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24216559 --- Diff: examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import kafka.serializer.StringDecoder + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount brokers topics + * brokers is a list of one or more zookeeper servers that make quorum --- End diff -- Right. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73164892 [Test build #26878 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull) for PR 4384 at commit [`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73170929 [Test build #26878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26878/consoleFull) for PR 4384 at commit [`50f2b56`](https://github.com/apache/spark/commit/50f2b56f57b00845d006361049dd2c4bac89957c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73170936 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26878/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73175650 [Test build #26888 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26888/consoleFull) for PR 4384 at commit [`bb65232`](https://github.com/apache/spark/commit/bb65232c008d66c7895e83e9736353881b5d719e). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24142354 --- Diff: examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import kafka.serializer.StringDecoder + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: DirectKafkaWordCount brokers topics + * brokers is a list of one or more zookeeper servers that make quorum --- End diff -- These are kafka servers, not zookeeper servers --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/4384#discussion_r24142682 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.io.File + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.language.postfixOps + +import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, Timeouts} + +import org.apache.spark.SparkConf +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.util.Utils + +class DirectKafkaStreamSuite extends KafkaStreamSuiteBase --- End diff -- Just renamed the file and testsuite class from KafkaDirectStreamSuite to DirectKafkaStreamSuite, but Git/Github considered it to be a move. The first unit test is unmodified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-73003057 [Test build #586 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull) for PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-72997883 [Test build #586 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/586/consoleFull) for PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-72991452 [Test build #26822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull) for PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-72992469 @koeninger Can you please take a look. @pwendell (Optional) Take a look if you are interested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/4384 [SPARK-4964][Streaming][Kafka] More updates to Exactly-once Kafka stream Changes - Added example - Added a critical unit test that verifies that offset ranges can be recovered through checkpoints You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark new-kafka-fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4384.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4384 commit 6a91cab3172701e1c286dbcdadee825a33230913 Author: Tathagata Das tathagata.das1...@gmail.com Date: 2015-02-05T03:56:46Z Added example commit 49867846e72329c849002706441d8165825f4a6b Author: Tathagata Das tathagata.das1...@gmail.com Date: 2015-02-05T04:01:17Z Added unit test to kafka offset recovery --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-72994795 [Test build #26822 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26822/consoleFull) for PR 4384 at commit [`4986784`](https://github.com/apache/spark/commit/49867846e72329c849002706441d8165825f4a6b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4964][Streaming][Kafka] More updates to...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4384#issuecomment-72994801 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26822/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org