[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69116416 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); --- End diff -- I refactored the API to avoid case classes and minimize publicly visible classes - #13996 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69087197 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); --- End diff -- Okay found the issue. In scala 2.10, if companion object of a case class has explicitly defined apply(), then the implicit apply method is not generated. In scala 2.11 it is generated. I remember now, this type of stuff is why we avoid using case classes in the public API. Do you mind if I convert these to simple classes?? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69083303 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); --- End diff -- We should figure out a way to fix scala 2.10. I don't think we need to revert this though since 2.10 is no longer the default build and it does not fail PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69082952 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010; + +import java.io.Serializable; +import java.util.*; + +import scala.collection.JavaConverters; + +import org.apache.kafka.common.TopicPartition; + +import org.junit.Assert; +import org.junit.Test; + +public class JavaConsumerStrategySuite implements Serializable { + + @Test + public void testConsumerStrategyConstructors() { +final String topic1 = "topic1"; +final Collection topics = Arrays.asList(topic1); +final scala.collection.Iterable sTopics = + JavaConverters.collectionAsScalaIterableConverter(topics).asScala(); +final TopicPartition tp1 = new TopicPartition(topic1, 0); +final TopicPartition tp2 = new TopicPartition(topic1, 1); +final Collection parts = Arrays.asList(tp1, tp2); +final scala.collection.Iterable sParts = + JavaConverters.collectionAsScalaIterableConverter(parts).asScala(); +final MapkafkaParams = new HashMap (); +kafkaParams.put("bootstrap.servers", "not used"); +final scala.collection.Map sKafkaParams = + JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala(); +final Map offsets = new HashMap<>(); +offsets.put(tp1, 23L); +final scala.collection.Map sOffsets = + JavaConverters.mapAsScalaMapConverter(offsets).asScala(); + +// make sure constructors can be called from java +final ConsumerStrategy sub0 = + Subscribe. apply(topics, kafkaParams, offsets); --- End diff -- This is seems to break in scala 2.10 and not scala 2.11. This is very weird. Merging this PR broke 2.10 builds - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/1947/console ``` [error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:54: error: incompatible types: Collection cannot be converted to Iterable [error] Subscribe. apply(topics, kafkaParams, offsets); [error] ^ [error] /home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:69: error: incompatible types: Collection cannot be converted to Iterable [error] Assign. apply(parts, kafkaParams, offsets); [error]^ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/11863 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69049921 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import org.apache.kafka.clients.consumer.OffsetCommitCallback +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.annotation.Experimental + +/** + * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** + * :: Experimental :: + * Represents any object that can commit a collection of [[OffsetRange]]s. + * The direct Kafka DStream implements this interface (see + * [[KafkaUtils.createDirectStream]]). + * {{{ + * val stream = KafkaUtils.createDirectStream(...) + * ... + * stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() { + * def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) { + *if (null != e) { + * // error + *} else { + * // success + * } + * } + * }) + * }}} + */ +@Experimental +trait CanCommitOffsets { --- End diff -- @koeninger I don think this is needed any more since we return InputDStream. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69047117 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of [[KafkaRDD]] where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, --- End diff -- incorrect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69035725 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of [[KafkaRDD]] where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class DirectKafkaInputDStream[K, V]( +_ssc: StreamingContext, +locationStrategy: LocationStrategy, +consumerStrategy: ConsumerStrategy[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + + val executorKafkaParams = { +val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) +KafkaUtils.fixKafkaParams(ekp) +ekp + } + + protected var currentOffsets = Map[TopicPartition, Long]() + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = consumerStrategy.onStart(currentOffsets) +} +kc + } + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +} +result + }
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69035388 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of [[KafkaRDD]] where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class DirectKafkaInputDStream[K, V]( +_ssc: StreamingContext, +locationStrategy: LocationStrategy, +consumerStrategy: ConsumerStrategy[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + + val executorKafkaParams = { +val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) +KafkaUtils.fixKafkaParams(ekp) +ekp + } + + protected var currentOffsets = Map[TopicPartition, Long]() + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = consumerStrategy.onStart(currentOffsets) +} +kc + } + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +} +result + }
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69034310 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/LocationStrategy.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.TopicPartition + + +/** + * Choice of how to schedule consumers for a given TopicPartition on an executor. + * Kafka 0.10 consumers prefetch messages, so it's important for performance + * to keep cached consumers on appropriate executors, not recreate them for every partition. + * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. + */ +sealed trait LocationStrategy --- End diff -- nit: add `@Experimental` for classes and methods in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69034246 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { --- End diff -- nit: add `@Experimental` for classes and methods in this file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69034026 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext } +import org.apache.spark.api.java.function.{ Function0 => JFunction0 } +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext } +import org.apache.spark.streaming.dstream._ + +@Experimental +object KafkaUtils extends Logging { + /** + * Scala constructor for a batch-oriented interface for consuming from Kafka. --- End diff -- Please add `:: Experimental ::` at the beginning of comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69033637 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of [[KafkaRDD]] where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class DirectKafkaInputDStream[K, V]( +_ssc: StreamingContext, +locationStrategy: LocationStrategy, +consumerStrategy: ConsumerStrategy[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + + val executorKafkaParams = { +val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) +KafkaUtils.fixKafkaParams(ekp) +ekp + } + + protected var currentOffsets = Map[TopicPartition, Long]() + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = consumerStrategy.onStart(currentOffsets) +} +kc + } + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +} +result + }
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r69007407 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +private[spark] class KafkaRDD[K, V]( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox( + timeout: Long, + confidence: Double = 0.95 + ): PartialResult[BoundedDouble] =
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68856277 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +public class JavaDirectKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient KafkaTestUtils kafkaTestUtils = null; + + @Before + public void setUp() { +kafkaTestUtils = new KafkaTestUtils(); +kafkaTestUtils.setup(); +SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); +ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + } + + @After + public void tearDown() { +if (ssc != null) { + ssc.stop(); + ssc = null; +} + +if (kafkaTestUtils != null) { + kafkaTestUtils.teardown(); + kafkaTestUtils = null; +} + } + + @Test + public void testKafkaStream() throws InterruptedException { +final String topic1 = "topic1"; +final String topic2 = "topic2"; +// hold a reference to the current offset ranges, so it can be used downstream +final AtomicReferenceoffsetRanges = new AtomicReference<>(); + +String[] topic1data = createTopicAndSendData(topic1); +String[] topic2data = createTopicAndSendData(topic2); + +Set sent = new HashSet<>(); +sent.addAll(Arrays.asList(topic1data)); +sent.addAll(Arrays.asList(topic2data)); + +Random random = new Random(); + +final Map kafkaParams = new HashMap<>(); +kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress()); +kafkaParams.put("key.deserializer", StringDeserializer.class); +kafkaParams.put("value.deserializer", StringDeserializer.class); +kafkaParams.put("auto.offset.reset", "earliest"); +kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt()); + +JavaInputDStream > istream1 = KafkaUtils.createDirectStream( +ssc, +PreferConsistent.create(), +Subscribe. create(Arrays.asList(topic1), kafkaParams) +); + +JavaDStream stream1 = istream1.transform( + // Make sure you can get offset ranges from the rdd + new Function >, +JavaRDD >>() { + @Override + public JavaRDD > call( +JavaRDD > rdd + ) { +OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); +offsetRanges.set(offsets); +Assert.assertEquals(topic1, offsets[0].topic()); +return rdd; + } +} +).map( +new Function , String>() { +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68855971 --- Diff: external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +public class JavaDirectKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private transient KafkaTestUtils kafkaTestUtils = null; + + @Before + public void setUp() { +kafkaTestUtils = new KafkaTestUtils(); +kafkaTestUtils.setup(); +SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); +ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200)); + } + + @After + public void tearDown() { +if (ssc != null) { + ssc.stop(); + ssc = null; +} + +if (kafkaTestUtils != null) { + kafkaTestUtils.teardown(); + kafkaTestUtils = null; +} + } + + @Test + public void testKafkaStream() throws InterruptedException { +final String topic1 = "topic1"; +final String topic2 = "topic2"; +// hold a reference to the current offset ranges, so it can be used downstream +final AtomicReferenceoffsetRanges = new AtomicReference<>(); + +String[] topic1data = createTopicAndSendData(topic1); +String[] topic2data = createTopicAndSendData(topic2); + +Set sent = new HashSet<>(); +sent.addAll(Arrays.asList(topic1data)); +sent.addAll(Arrays.asList(topic2data)); + +Random random = new Random(); + +final Map kafkaParams = new HashMap<>(); +kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress()); +kafkaParams.put("key.deserializer", StringDeserializer.class); +kafkaParams.put("value.deserializer", StringDeserializer.class); +kafkaParams.put("auto.offset.reset", "earliest"); +kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt()); + +JavaInputDStream > istream1 = KafkaUtils.createDirectStream( +ssc, +PreferConsistent.create(), +Subscribe. create(Arrays.asList(topic1), kafkaParams) +); + +JavaDStream stream1 = istream1.transform( + // Make sure you can get offset ranges from the rdd + new Function >, +JavaRDD >>() { + @Override + public JavaRDD > call( +JavaRDD > rdd + ) { +OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); +offsetRanges.set(offsets); +Assert.assertEquals(topic1, offsets[0].topic()); +return rdd; + } +} +).map( +new Function , String>() { +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68851801 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental --- End diff -- nit: remove `@Experimental` since it's not public --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68850101 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68849552 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68848064 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68847741 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68844319 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/package-info.java --- @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Kafka receiver for spark streaming. --- End diff -- nit: Kafka receiver -> Kafka InputDStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68844271 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/package.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +/** + * Kafka receiver for spark streaming, --- End diff -- nit: Kafka receiver -> Kafka InputDStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68642373 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( +topics: ju.Collection[java.lang.String], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { +val consumer = new KafkaConsumer[K, V](kafkaParams) +consumer.subscribe(topics) +consumer + } +} + +object Subscribe { + def create[K, V]( + keyClass: Class[K], + valueClass: Class[V], + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object] + ): Subscribe[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +Subscribe[K, V](topics, kafkaParams) + } +} + +/** + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Assign[K: ClassTag, V: ClassTag]( +topicPartitions: ju.Collection[TopicPartition], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68629750 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( --- End diff -- The KafkaConsumer class and Consumer interface seems to be written in Java. So I dont think there is a real need for ClassTags. Please check it out, I may very well be missing something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68579465 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( --- End diff -- Pretty sure this needs class tags because the underlying consumer that its creating is parameterized on those classes, but I can test it out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68578250 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart --- End diff -- If they're expected to do additional work it needs to be documented, but in retrospect I don't think its necessary, will remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68577783 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( +topics: ju.Collection[java.lang.String], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { +val consumer = new KafkaConsumer[K, V](kafkaParams) +consumer.subscribe(topics) +consumer + } +} + +object Subscribe { + def create[K, V]( + keyClass: Class[K], + valueClass: Class[V], + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object] + ): Subscribe[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +Subscribe[K, V](topics, kafkaParams) + } +} + +/** + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Assign[K: ClassTag, V: ClassTag]( +topicPartitions: ju.Collection[TopicPartition], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68549882 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( +topics: ju.Collection[java.lang.String], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { +val consumer = new KafkaConsumer[K, V](kafkaParams) +consumer.subscribe(topics) +consumer + } +} + +object Subscribe { + def create[K, V]( + keyClass: Class[K], + valueClass: Class[V], + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object] + ): Subscribe[K, V] = { +implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) +implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) +Subscribe[K, V](topics, kafkaParams) + } +} + +/** + * Assign a fixed collection of TopicPartitions + * @param topicPartitions collection of TopicPartitions to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Assign[K: ClassTag, V: ClassTag]( +topicPartitions: ju.Collection[TopicPartition], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68549572 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( +topics: ju.Collection[java.lang.String], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { +val consumer = new KafkaConsumer[K, V](kafkaParams) +consumer.subscribe(topics) +consumer + } +} + +object Subscribe { --- End diff -- Please add some basic docs on the object ("e.g. Helper object for creating [[Subscribe]] strategy") and create as that will be an entry point as well, especially in Scala docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68549333 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart --- End diff -- Please do not add Todo in the public doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68531305 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( +topics: ju.Collection[java.lang.String], +kafkaParams: ju.Map[String, Object] + ) extends ConsumerStrategy[K, V] { + + def executorKafkaParams: ju.Map[String, Object] = kafkaParams + + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = { +val consumer = new KafkaConsumer[K, V](kafkaParams) +consumer.subscribe(topics) +consumer + } +} + +object Subscribe { + def create[K, V]( + keyClass: Class[K], + valueClass: Class[V], + topics: ju.Collection[java.lang.String], + kafkaParams: ju.Map[String, Object] + ): Subscribe[K, V] = { --- End diff -- nit: wrong indent would be nice to have an apply for scala maps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68531114 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( --- End diff -- Also lets make the constructors private. Will be easier for future compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68530676 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.TopicPartition + +/** + * Choice of how to create and configure underlying Kafka Consumers on driver and executors. + * Kafka 0.10 consumers can require additional, sometimes complex, setup after object + * instantiation. This interface encapsulates that process, and allows it to be checkpointed. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +trait ConsumerStrategy[K, V] { + /** + * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on executors. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ + def executorKafkaParams: ju.Map[String, Object] + + /** + * Must return a fully configured Kafka Consumer, including subscribed or assigned topics. + * This consumer will be used on the driver to query for offsets only, not messages. + * @param currentOffsets A map from TopicPartition to offset, indicating how far the driver + * has successfully read. Will be empty on initial start, possibly non-empty on restart from + * checkpoint. + * TODO: is strategy or dstream responsible for seeking on checkpoint restart + */ + def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] +} + +/** + * Subscribe to a collection of topics. + * @param topics collection of topics to subscribe + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters to be used on driver. The same parameters will be used on executors, + * with minor automatic modifications applied. + * Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + */ +case class Subscribe[K: ClassTag, V: ClassTag]( --- End diff -- Why does this need to have class tags? That is forcing the keyClass and valueClass params in the `create()` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68529586 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of [[KafkaRDD]] where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param locationStrategy In most cases, pass in [[PreferConsistent]], + * see [[LocationStrategy]] for more details. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +private[spark] class DirectKafkaInputDStream[K: ClassTag, V: ClassTag]( +_ssc: StreamingContext, +locationStrategy: LocationStrategy, +consumerStrategy: ConsumerStrategy[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { + + val executorKafkaParams = { +val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams) +KafkaUtils.fixKafkaParams(ekp) +ekp + } + + protected var currentOffsets = Map[TopicPartition, Long]() + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = consumerStrategy.onStart(currentOffsets) +} +kc + } + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +logError("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68161842 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68161572 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68161054 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68161026 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68155816 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68151084 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68150093 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox(
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68149741 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox(
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68149159 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68149151 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68149138 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68148952 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox(
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68147571 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68147585 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68147554 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. + */ +private[kafka] +class CachedKafkaConsumer[K, V] private( + val groupId: String, + val topic: String, + val partition: Int, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), +"groupId used for cache key must match the groupId in kafkaParams") + + val topicPartition = new TopicPartition(topic, partition) + + protected val consumer = { +val c = new KafkaConsumer[K, V](kafkaParams) +val tps = new ju.ArrayList[TopicPartition]() +tps.add(topicPartition) +c.assign(tps) +c + } + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var nextOffset = -2L + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { +log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") +if (offset != nextOffset) { + log.info(s"Initial fetch for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") +var record = buffer.next() + +if (record.offset != offset) { + log.info(s"Buffer miss for $groupId $topic $partition $offset") + seek(offset) + poll(timeout) + assert(buffer.hasNext(), +s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") + record = buffer.next() + assert(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +} + +nextOffset = offset + 1 +record + } + + private def seek(offset: Long): Unit = { +log.debug(s"Seeking to $topicPartition $offset") +consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { +val p = consumer.poll(timeout) +val r = p.records(topicPartition) +log.debug(s"Polled ${p.partitions()} ${r.size}") +buffer = r.iterator + } + +} + +private[kafka] +object CachedKafkaConsumer extends Logging { + + private case class CacheKey(groupId: String, topic: String, partition: Int) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null + + /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68147109 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68139170 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68138575 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68136614 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68136246 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68136255 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68134405 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68128869 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. --- End diff -- concurrentJobs is still undocumented right? I wouldn't expect the existing direct stream to behave predictably under those conditions either. The only time I seeing trying to process different chunks of the same topicpartition at the same time as being unavoidable is during checkpoint recovery, which is why the cache isn't used then. If someone has a better idea I'm all ears, but given the prefetching / handshaking implications of the new consumer, I don't see an alternative to caching them. We could write considerably more complicated caching logic with a checkout / return system where a certain number of consumers for the same topicpartition were allowed at a time... but IMHO multiple consumers of the same topicpartition in the same job in the same jvm is not something we want to encourage - it's at best a waste of resources and at worst a correctness problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68123238 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. --- End diff -- This could happen when `spark.streaming.concurrentJobs` is more than 1 and some batches are too slow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68121385 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. --- End diff -- Using the same group id for any two jobs you expect to keep separate is already a disaster with either the old or new Kafka high level consumers. The underlying consumer does have a lightweight locking mechanism, and will just throw a ConcurrentModificationException if someone does do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r68120504 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +/** + * Consumer of single topicpartition, intended for cached reuse. + * Underlying consumer is not threadsafe, so neither is this, + * but processing the same topicpartition and group id in multiple threads is usually bad anyway. --- End diff -- What if two jobs uses the same `topic` and `groupId` and they run at the same time? Then a CachedKafkaConsumer may be used by two threads in the same JVM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67878608 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. --- End diff -- Right, I think the comments on the private class constructor are still useful for anyone looking at the code, even if they don't show up in scaladoc. Given that the parameters aren't always the same, it's at least possibly worth the duplication. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67877379 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67876883 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp))
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67875791 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. --- End diff -- But that's because the comment is duplicated there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67875720 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where --- End diff -- Makes sense. As I said: Only nitpicks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67875507 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. --- End diff -- The class constructor is private. The parameters show up in the companion object methods, and the links to the companion object work fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67875307 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where --- End diff -- This link style works, and this area of the code hasn't changed from the 0.8 version of the consumer. Like the other cosmetic comments, I'm reluctant to change things in 0.8 version, just to keep this PR easy to compare. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67827026 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{classTag, ClassTag} + +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.storage.StorageLevel + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * @param kafkaParams Kafka + * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;> + * configuration parameters. Requires "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param useConsumerCache whether to use a consumer from a per-jvm cache + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class KafkaRDD[ + K: ClassTag, + V: ClassTag] private[spark] ( +sc: SparkContext, +val kafkaParams: ju.Map[String, Object], +val offsetRanges: Array[OffsetRange], +val preferredHosts: ju.Map[TopicPartition, String], +useConsumerCache: Boolean +) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges { + + assert("none" == + kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String], +ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + + " must be set to none for executor kafka params, else messages may not match offsetRange") + + assert(false == + kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean], +ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + " must be set to false for executor kafka params, else offsets may commit before processing") + + // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time? + private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256) + private val cacheInitialCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16) + private val cacheMaxCapacity = +conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64) + private val cacheLoadFactor = +conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 0.75).toFloat + + override def persist(newLevel: StorageLevel): this.type = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + override def getPartitions: Array[Partition] = { +offsetRanges.zipWithIndex.map { case (o, i) => +new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) +}.toArray + } + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox(
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67822694 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. + * In most cases, use [[DirectKafkaInputDStream.preferConsistent]] + * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same nodes as brokers. + * @param executorKafkaParams Kafka + * http://kafka.apache.org/documentation.html#newconsumerconfigs;> + * configuration parameters. + * Requires "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param driverConsumer zero-argument function for you to construct a Kafka Consumer, + * and subscribe topics or assign partitions. + * This consumer will be used on the driver to query for offsets only, not messages. + * See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer doc + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ +@Experimental +class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] ( +_ssc: StreamingContext, +preferredHosts: ju.Map[TopicPartition, String], +executorKafkaParams: ju.Map[String, Object], +driverConsumer: () => Consumer[K, V] + ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging { + + @transient private var kc: Consumer[K, V] = null + def consumer(): Consumer[K, V] = this.synchronized { +if (null == kc) { + kc = driverConsumer() +} +kc + } + consumer() + + override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = { +log.error("Kafka ConsumerRecord is not serializable. " + + "Use .map to extract fields before calling .persist or .window") +super.persist(newLevel) + } + + protected def getBrokers = { +val c = consumer +val result = new ju.HashMap[TopicPartition, String]() +val hosts = new ju.HashMap[TopicPartition, String]() +val assignments = c.assignment().iterator() +while (assignments.hasNext()) { + val tp: TopicPartition = assignments.next() + if (null == hosts.get(tp)) { +val infos = c.partitionsFor(tp.topic).iterator() +while (infos.hasNext()) { + val i = infos.next() + hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host()) +} + } + result.put(tp, hosts.get(tp)) +
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67822597 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * @param preferredHosts map from TopicPartition to preferred host for processing that partition. --- End diff -- I don't know why but when I build the Scaladoc (using maven scala:doc) none of these constructor parameters are being shown. And if they were I don't think the links to the companion object would work (preferConsistent and preferBrokers) as Scaladoc currently doesn't support links to the companion object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...
Github user lfrancke commented on a diff in the pull request: https://github.com/apache/spark/pull/11863#discussion_r67821600 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.{ util => ju } +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.common.{ PartitionInfo, TopicPartition } + +import org.apache.spark.annotation.Experimental +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where --- End diff -- In Scaladoc this would be `[[org.apache.spark.streaming.kafka.KafkaRDD]]` instead of `{@link ...` (I think) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org