[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69116416
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
--- End diff --

I refactored the API to avoid case classes and minimize publicly visible 
classes - #13996 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69087197
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
--- End diff --

Okay found the issue. In scala 2.10, if companion object of a case class 
has explicitly defined apply(), then the implicit apply method is not 
generated. In scala 2.11 it is generated. 

I remember now, this type of stuff is why we avoid using case classes in 
the public API. Do you mind if I convert these to simple classes??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69083303
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
--- End diff --

We should figure out a way to fix scala 2.10. I don't think we need to 
revert this though since 2.10 is no longer the default build and it does not 
fail PRs.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69082952
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
--- End diff --

This is seems to break in scala 2.10 and not scala 2.11. This is very weird.

Merging this PR broke 2.10 builds - 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-sbt-scala-2.10/1947/console

```
[error] 
/home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:54:
  error: incompatible types: Collection cannot be converted to 
Iterable
[error]   Subscribe.apply(topics, kafkaParams, offsets);
[error]   ^
[error] 
/home/jenkins/workspace/spark-master-compile-sbt-scala-2.10/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java:69:
  error: incompatible types: Collection cannot be converted to 
Iterable
[error]   Assign.apply(parts, kafkaParams, offsets);
[error]^
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/11863


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69049921
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.kafka.clients.consumer.OffsetCommitCallback
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This 
can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *  ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ *  :: Experimental ::
+ * Represents any object that can commit a collection of [[OffsetRange]]s.
+ * The direct Kafka DStream implements this interface (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   val stream = KafkaUtils.createDirectStream(...)
+ * ...
+ *   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new 
OffsetCommitCallback() {
+ * def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], 
e: Exception) {
+ *if (null != e) {
+ *   // error
+ *} else {
+ * // success
+ *   }
+ * }
+ *   })
+ * }}}
+ */
+@Experimental
+trait CanCommitOffsets {
--- End diff --

@koeninger I don think this is needed any more since we return InputDStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69047117
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of [[KafkaRDD]] where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
--- End diff --

incorrect 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69035725
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of [[KafkaRDD]] where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+_ssc: StreamingContext,
+locationStrategy: LocationStrategy,
+consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
+
+  val executorKafkaParams = {
+val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
+KafkaUtils.fixKafkaParams(ekp)
+ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = consumerStrategy.onStart(currentOffsets)
+}
+kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+logError("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+}
+result
+  }
  

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69035388
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of [[KafkaRDD]] where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+_ssc: StreamingContext,
+locationStrategy: LocationStrategy,
+consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
+
+  val executorKafkaParams = {
+val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
+KafkaUtils.fixKafkaParams(ekp)
+ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = consumerStrategy.onStart(currentOffsets)
+}
+kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+logError("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+}
+result
+  }

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69034310
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/LocationStrategy.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+
+/**
+ * Choice of how to schedule consumers for a given TopicPartition on an 
executor.
+ * Kafka 0.10 consumers prefetch messages, so it's important for 
performance
+ * to keep cached consumers on appropriate executors, not recreate them 
for every partition.
+ * Choice of location is only a preference, not an absolute; partitions 
may be scheduled elsewhere.
+ */
+sealed trait LocationStrategy
--- End diff --

nit: add `@Experimental` for classes and methods in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69034246
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
--- End diff --

nit: add `@Experimental` for classes and methods in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69034026
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
+import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{ JavaInputDStream, 
JavaStreamingContext }
+import org.apache.spark.streaming.dstream._
+
+@Experimental
+object KafkaUtils extends Logging {
+  /**
+   * Scala constructor for a batch-oriented interface for consuming from 
Kafka.
--- End diff --

Please add `:: Experimental ::` at the beginning of comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69033637
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of [[KafkaRDD]] where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+_ssc: StreamingContext,
+locationStrategy: LocationStrategy,
+consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
+
+  val executorKafkaParams = {
+val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
+KafkaUtils.fixKafkaParams(ekp)
+ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = consumerStrategy.onStart(currentOffsets)
+}
+kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+logError("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+}
+result
+  }
  

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r69007407
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class KafkaRDD[K, V](
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+logError("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(
+  timeout: Long,
+  confidence: Double = 0.95
+  ): PartialResult[BoundedDouble] = 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68856277
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+kafkaTestUtils = new KafkaTestUtils();
+kafkaTestUtils.setup();
+SparkConf sparkConf = new SparkConf()
+  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+  }
+
+  @After
+  public void tearDown() {
+if (ssc != null) {
+  ssc.stop();
+  ssc = null;
+}
+
+if (kafkaTestUtils != null) {
+  kafkaTestUtils.teardown();
+  kafkaTestUtils = null;
+}
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+final String topic1 = "topic1";
+final String topic2 = "topic2";
+// hold a reference to the current offset ranges, so it can be used 
downstream
+final AtomicReference offsetRanges = new 
AtomicReference<>();
+
+String[] topic1data = createTopicAndSendData(topic1);
+String[] topic2data = createTopicAndSendData(topic2);
+
+Set sent = new HashSet<>();
+sent.addAll(Arrays.asList(topic1data));
+sent.addAll(Arrays.asList(topic2data));
+
+Random random = new Random();
+
+final Map kafkaParams = new HashMap<>();
+kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+kafkaParams.put("key.deserializer", StringDeserializer.class);
+kafkaParams.put("value.deserializer", StringDeserializer.class);
+kafkaParams.put("auto.offset.reset", "earliest");
+kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt());
+
+JavaInputDStream> istream1 = 
KafkaUtils.createDirectStream(
+ssc,
+PreferConsistent.create(),
+Subscribe.create(Arrays.asList(topic1), 
kafkaParams)
+);
+
+JavaDStream stream1 = istream1.transform(
+  // Make sure you can get offset ranges from the rdd
+  new Function>,
+JavaRDD>>() {
+  @Override
+  public JavaRDD> call(
+JavaRDD> rdd
+  ) {
+OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+offsetRanges.set(offsets);
+Assert.assertEquals(topic1, offsets[0].topic());
+return rdd;
+  }
+}
+).map(
+new Function, String>() {
+   

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68855971
  
--- Diff: 
external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+kafkaTestUtils = new KafkaTestUtils();
+kafkaTestUtils.setup();
+SparkConf sparkConf = new SparkConf()
+  .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+  }
+
+  @After
+  public void tearDown() {
+if (ssc != null) {
+  ssc.stop();
+  ssc = null;
+}
+
+if (kafkaTestUtils != null) {
+  kafkaTestUtils.teardown();
+  kafkaTestUtils = null;
+}
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+final String topic1 = "topic1";
+final String topic2 = "topic2";
+// hold a reference to the current offset ranges, so it can be used 
downstream
+final AtomicReference offsetRanges = new 
AtomicReference<>();
+
+String[] topic1data = createTopicAndSendData(topic1);
+String[] topic2data = createTopicAndSendData(topic2);
+
+Set sent = new HashSet<>();
+sent.addAll(Arrays.asList(topic1data));
+sent.addAll(Arrays.asList(topic2data));
+
+Random random = new Random();
+
+final Map kafkaParams = new HashMap<>();
+kafkaParams.put("bootstrap.servers", kafkaTestUtils.brokerAddress());
+kafkaParams.put("key.deserializer", StringDeserializer.class);
+kafkaParams.put("value.deserializer", StringDeserializer.class);
+kafkaParams.put("auto.offset.reset", "earliest");
+kafkaParams.put("group.id", "java-test-consumer-" + random.nextInt());
+
+JavaInputDStream> istream1 = 
KafkaUtils.createDirectStream(
+ssc,
+PreferConsistent.create(),
+Subscribe.create(Arrays.asList(topic1), 
kafkaParams)
+);
+
+JavaDStream stream1 = istream1.transform(
+  // Make sure you can get offset ranges from the rdd
+  new Function>,
+JavaRDD>>() {
+  @Override
+  public JavaRDD> call(
+JavaRDD> rdd
+  ) {
+OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+offsetRanges.set(offsets);
+Assert.assertEquals(topic1, offsets[0].topic());
+return rdd;
+  }
+}
+).map(
+new Function, String>() {
+   

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68851801
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
--- End diff --

nit: remove `@Experimental` since it's not public


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68850101
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68849552
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68848064
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68847741
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68844319
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
 ---
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka receiver for spark streaming.
--- End diff --

nit: Kafka receiver -> Kafka InputDStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68844271
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/package.scala
 ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Kafka receiver for spark streaming,
--- End diff --

nit: Kafka receiver -> Kafka InputDStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68642373
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
+topics: ju.Collection[java.lang.String],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = 
{
+val consumer = new KafkaConsumer[K, V](kafkaParams)
+consumer.subscribe(topics)
+consumer
+  }
+}
+
+object Subscribe {
+  def create[K, V](
+  keyClass: Class[K],
+  valueClass: Class[V],
+  topics: ju.Collection[java.lang.String],
+  kafkaParams: ju.Map[String, Object]
+  ): Subscribe[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+Subscribe[K, V](topics, kafkaParams)
+  }
+}
+
+/**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Assign[K: ClassTag, V: ClassTag](
+topicPartitions: ju.Collection[TopicPartition],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68629750
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
--- End diff --

The KafkaConsumer class and Consumer interface seems to be written in Java. 
So I dont think there is a real need for ClassTags. Please check it out, I may 
very well be missing something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68579465
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
--- End diff --

Pretty sure this needs class tags because the underlying consumer that its 
creating is parameterized on those classes, but I can test it out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68578250
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
--- End diff --

If they're expected to do additional work it needs to be documented, but in 
retrospect I don't think its necessary, will remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68577783
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
+topics: ju.Collection[java.lang.String],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = 
{
+val consumer = new KafkaConsumer[K, V](kafkaParams)
+consumer.subscribe(topics)
+consumer
+  }
+}
+
+object Subscribe {
+  def create[K, V](
+  keyClass: Class[K],
+  valueClass: Class[V],
+  topics: ju.Collection[java.lang.String],
+  kafkaParams: ju.Map[String, Object]
+  ): Subscribe[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+Subscribe[K, V](topics, kafkaParams)
+  }
+}
+
+/**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Assign[K: ClassTag, V: ClassTag](
+topicPartitions: ju.Collection[TopicPartition],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68549882
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
+topics: ju.Collection[java.lang.String],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = 
{
+val consumer = new KafkaConsumer[K, V](kafkaParams)
+consumer.subscribe(topics)
+consumer
+  }
+}
+
+object Subscribe {
+  def create[K, V](
+  keyClass: Class[K],
+  valueClass: Class[V],
+  topics: ju.Collection[java.lang.String],
+  kafkaParams: ju.Map[String, Object]
+  ): Subscribe[K, V] = {
+implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+Subscribe[K, V](topics, kafkaParams)
+  }
+}
+
+/**
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Assign[K: ClassTag, V: ClassTag](
+topicPartitions: ju.Collection[TopicPartition],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68549572
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
+topics: ju.Collection[java.lang.String],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = 
{
+val consumer = new KafkaConsumer[K, V](kafkaParams)
+consumer.subscribe(topics)
+consumer
+  }
+}
+
+object Subscribe {
--- End diff --

Please add some basic docs on the object ("e.g. Helper object for creating 
[[Subscribe]] strategy") and create as that will be an entry point as well, 
especially in Scala docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68549333
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
--- End diff --

Please do not add Todo in the public doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68531305
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
+topics: ju.Collection[java.lang.String],
+kafkaParams: ju.Map[String, Object]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = 
{
+val consumer = new KafkaConsumer[K, V](kafkaParams)
+consumer.subscribe(topics)
+consumer
+  }
+}
+
+object Subscribe {
+  def create[K, V](
+  keyClass: Class[K],
+  valueClass: Class[V],
+  topics: ju.Collection[java.lang.String],
+  kafkaParams: ju.Map[String, Object]
+  ): Subscribe[K, V] = {
--- End diff --

nit: wrong indent

would be nice to have an apply for scala maps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68531114
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
--- End diff --

Also lets make the constructors private. Will be easier for future 
compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68530676
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/ConsumerStrategy.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Choice of how to create and configure underlying Kafka Consumers on 
driver and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup 
after object
+ *  instantiation. This interface encapsulates that process, and allows it 
to be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+   * configuration parameters to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed 
or assigned topics.
+   * This consumer will be used on the driver to query for offsets only, 
not messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating 
how far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   * TODO: is strategy or dstream responsible for seeking on checkpoint 
restart
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters to be used on driver. The same parameters 
will be used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ */
+case class Subscribe[K: ClassTag, V: ClassTag](
--- End diff --

Why does this need to have class tags? That is forcing the keyClass and 
valueClass params in the `create()` method.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-27 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68529586
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of [[KafkaRDD]] where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+private[spark] class DirectKafkaInputDStream[K: ClassTag, V: ClassTag](
+_ssc: StreamingContext,
+locationStrategy: LocationStrategy,
+consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
+
+  val executorKafkaParams = {
+val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
+KafkaUtils.fixKafkaParams(ekp)
+ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = consumerStrategy.onStart(currentOffsets)
+}
+kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+logError("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+   

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68161842
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68161572
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68161054
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68161026
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68155816
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68151084
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68150093
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68149741
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68149159
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68149151
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68149138
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68148952
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68147571
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68147585
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68147554
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
+ */
+private[kafka]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new ju.ArrayList[TopicPartition]()
+tps.add(topicPartition)
+c.assign(tps)
+c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+log.debug(s"Get $groupId $topic $partition nextOffset $nextOffset 
requested $offset")
+if (offset != nextOffset) {
+  log.info(s"Initial fetch for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+}
+
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+var record = buffer.next()
+
+if (record.offset != offset) {
+  log.info(s"Buffer miss for $groupId $topic $partition $offset")
+  seek(offset)
+  poll(timeout)
+  assert(buffer.hasNext(),
+s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
+  record = buffer.next()
+  assert(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+}
+
+nextOffset = offset + 1
+record
+  }
+
+  private def seek(offset: Long): Unit = {
+log.debug(s"Seeking to $topicPartition $offset")
+consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+val p = consumer.poll(timeout)
+val r = p.records(topicPartition)
+log.debug(s"Polled ${p.partitions()}  ${r.size}")
+buffer = r.iterator
+  }
+
+}
+
+private[kafka]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: 
Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a 
simple LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] 
= null
+
+  /** Must be called before get, once per JVM, to configure the cache. 
Further calls are ignored 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68147109
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68139170
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68138575
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68136614
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+  

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68136246
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68136255
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+  

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68134405
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+  

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68128869
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
--- End diff --

concurrentJobs is still undocumented right?  I wouldn't expect the existing 
direct stream to behave predictably under those conditions either.

The only time I seeing trying to process different chunks of the same 
topicpartition at the same time as being unavoidable is during checkpoint 
recovery, which is why the cache isn't used then.

If someone has a better idea I'm all ears, but given the prefetching / 
handshaking implications of the new consumer, I don't see an alternative to 
caching them.  We could write considerably more complicated caching logic with 
a checkout / return system where a certain number of consumers for the same 
topicpartition were allowed at a time... but IMHO multiple consumers of the 
same topicpartition in the same job in the same jvm is not something we want to 
encourage - it's at best a waste of resources and at worst a correctness 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68123238
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
--- End diff --

This could happen when `spark.streaming.concurrentJobs` is more than 1 and 
some batches are too slow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68121385
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
--- End diff --

Using the same group id for any two jobs you expect to keep separate is 
already a disaster with either the old or new Kafka high level consumers.

The underlying consumer does have a lightweight locking mechanism, and will 
just throw a ConcurrentModificationException if someone does do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r68120504
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/CachedKafkaConsumer.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
--- End diff --

What if two jobs uses the same `topic` and `groupId` and they run at the 
same time? Then a CachedKafkaConsumer may be used by two threads in the same 
JVM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67878608
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
--- End diff --

Right, I think the comments on the private class constructor are still 
useful for anyone looking at the code, even if they don't show up in scaladoc.  
Given that the parameters aren't always the same, it's at least possibly worth 
the duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67877379
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67876883
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67875791
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
--- End diff --

But that's because the comment is duplicated there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67875720
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
--- End diff --

Makes sense. As I said: Only nitpicks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67875507
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
--- End diff --

The class constructor is private.  The parameters show up in the companion 
object methods, and the links to the companion object work fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67875307
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
--- End diff --

This link style works, and this area of the code hasn't changed from the 
0.8 version of the consumer.  Like the other cosmetic comments, I'm reluctant 
to change things in 0.8 version, just to keep this PR easy to compare.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67827026
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, 
TaskContext}
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * http://kafka.apache.org/documentation.htmll#newconsumerconfigs;>
+ * configuration parameters. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag] private[spark] (
+sc: SparkContext,
+val kafkaParams: ju.Map[String, Object],
+val offsetRanges: Array[OffsetRange],
+val preferredHosts: ju.Map[TopicPartition, String],
+useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with 
HasOffsetRanges {
+
+  assert("none" ==
+
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+  " must be set to none for executor kafka params, else messages may 
not match offsetRange")
+
+  assert(false ==
+
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+  " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time 
vs ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+offsetRanges.zipWithIndex.map { case (o, i) =>
+new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+}.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67822694
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on 
same nodes as brokers.
+ * @param executorKafkaParams Kafka
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs;>
+ * configuration parameters.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param driverConsumer zero-argument function for you to construct a 
Kafka Consumer,
+ *  and subscribe topics or assign partitions.
+ *  This consumer will be used on the driver to query for offsets only, 
not messages.
+ *  See http://kafka.apache.org/documentation.html#newconsumerapi;>Consumer 
doc
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+class DirectKafkaInputDStream[K: ClassTag, V: ClassTag] private[spark] (
+_ssc: StreamingContext,
+preferredHosts: ju.Map[TopicPartition, String],
+executorKafkaParams: ju.Map[String, Object],
+driverConsumer: () => Consumer[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging {
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+if (null == kc) {
+  kc = driverConsumer()
+}
+kc
+  }
+  consumer()
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, 
V]] = {
+log.error("Kafka ConsumerRecord is not serializable. " +
+  "Use .map to extract fields before calling .persist or .window")
+super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+val c = consumer
+val result = new ju.HashMap[TopicPartition, String]()
+val hosts = new ju.HashMap[TopicPartition, String]()
+val assignments = c.assignment().iterator()
+while (assignments.hasNext()) {
+  val tp: TopicPartition = assignments.next()
+  if (null == hosts.get(tp)) {
+val infos = c.partitionsFor(tp.topic).iterator()
+while (infos.hasNext()) {
+  val i = infos.next()
+  hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+}
+  }
+  result.put(tp, hosts.get(tp))
+ 

[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67822597
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
--- End diff --

I don't know why but when I build the Scaladoc (using maven scala:doc) none 
of these constructor parameters are being shown. And if they were I don't think 
the links to the companion object would work (preferConsistent and 
preferBrokers) as Scaladoc currently doesn't support links to the companion 
object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11863: [SPARK-12177][Streaming][Kafka] Update KafkaDStre...

2016-06-21 Thread lfrancke
Github user lfrancke commented on a diff in the pull request:

https://github.com/apache/spark/pull/11863#discussion_r67821600
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
--- End diff --

In Scaladoc this would be `[[org.apache.spark.streaming.kafka.KafkaRDD]]` 
instead of `{@link ...` (I think)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org