[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-05-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20997


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-05-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r185452496
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-05-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r185451988
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-05-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r185451919
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-05-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r185451746
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184210716
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184200701
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+
+  

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184200402
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+
+  

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184194919
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
--- End diff --

nit: `private[kafka010] ` since class already has that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184203155
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+
+  

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r184200766
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010] class InternalKafkaConsumer[K, V](
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  private[kafka010] val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+.asInstanceOf[String]
+
+  private val consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val topics = ju.Arrays.asList(topicPartition)
+c.assign(topics)
+c
+  }
+
+  def close(): Unit = consumer.close()
+
+  

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181655790
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181647681
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-16 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181645665
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181507520
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181506863
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181506582
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-13 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181502862
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
--- End diff --

Reuse test added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181116277
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181109231
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
--- End diff --

Reuse test added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181098163
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
+  val bytes = consumer.get(offset, 1).value()
+  new String(bytes)
+}
+assert(rcvd == data)
+  } catch {
+case e: Throwable =>
+  error = e
+  throw e
+  } finally {
+consumer.release()
+  }
+}
+
+val threadPool = Executors.newFixedThreadPool(numThreads)
+try {
+  val futures = (1 to numConsumerUsages).map { i =>
+threadPool.submit(new Runnable {
+  override def run(): Unit = { consume(i) }
+})
+  }
+  futures.foreach(_.get(1, TimeUnit.MINUTES))
+  assert(error == null)
+} finally {
+  threadPool.shutdown()
+}
+  }
+}
--- End diff --

Removed from the PR description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r18108
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181077275
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181076158
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181074560
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
+  val bytes = consumer.get(offset, 1).value()
+  new String(bytes)
+}
+assert(rcvd == data)
+  } catch {
+case e: Throwable =>
+  error = e
+  throw e
+  } finally {
+consumer.release()
+  }
+}
+
+val threadPool = Executors.newFixedThreadPool(numThreads)
+try {
+  val futures = (1 to numConsumerUsages).map { i =>
+threadPool.submit(new Runnable {
+  override def run(): Unit = { consume(i) }
+})
+  }
+  futures.foreach(_.get(1, TimeUnit.MINUTES))
+  assert(error == null)
+} finally {
+  threadPool.shutdown()
+}
+  }
+}
--- End diff --

That's a baad cut and paste issue. This PR intends to solve ` 
ConcurrentModificationException`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181073687
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181072630
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181066695
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
--- End diff --

Yeah, it's an overkill. Removed volatile and switched to val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181066058
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181063487
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181058027
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181057829
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181057477
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181057345
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-12 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r181057226
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180810282
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180808097
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180808081
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180807486
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180806811
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180806244
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180805909
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180805380
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
--- End diff --

Changed to assign the groupId from the params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180780119
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
--- End diff --

Leftover from old code, fixing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180282891
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
--- End diff --

This is mostly vestigial (there used to be a remove method that took a 
groupId, but no kafkaParams, so there was symmetry).  

I don't see a reason it can't be changed to match the SQL version at this 
point, i.e. assign groupId from the params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180283864
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180284812
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180280531
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
--- End diff --

This is an example of the cut & paste I was referring to.

In this case, I don't believe consumer is ever reassigned, so it doesn't 
even need to be a var.

It was reassigned in the SQL version of the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180285599
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180280210
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
+  val bytes = consumer.get(offset, 1).value()
+  new String(bytes)
+}
+assert(rcvd == data)
+  } catch {
+case e: Throwable =>
+  error = e
+  throw e
+  } finally {
+consumer.release()
+  }
+}
+
+val threadPool = Executors.newFixedThreadPool(numThreads)
+try {
+  val futures = (1 to numConsumerUsages).map { i =>
+threadPool.submit(new Runnable {
+  override def run(): Unit = { consume(i) }
+})
+  }
+  futures.foreach(_.get(1, TimeUnit.MINUTES))
+  assert(error == null)
+} finally {
+  threadPool.shutdown()
+}
+  }
+}
--- End diff --

If this PR is intended to fix a problem with silent reading of incorrect 
data, can you add a test reproducing that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180283733
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180220465
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180212631
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180222555
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
--- End diff --

this is good, but it would be nice to have a test which checks that cached 
consumers are re-used when possible.  Eg this could pass just by never caching 
anything.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180221087
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.kafka.clients.consumer.ConsumerConfig._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var testUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("concurrent use of KafkaDataConsumer") {
+KafkaDataConsumer.init(16, 64, 0.75f)
+
+val topic = "topic" + Random.nextInt()
+val data = (1 to 1000).map(_.toString)
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic)
+testUtils.sendMessages(topic, data.toArray)
+
+val groupId = "groupId"
+val kafkaParams = Map[String, Object](
+  GROUP_ID_CONFIG -> groupId,
+  BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+  KEY_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  VALUE_DESERIALIZER_CLASS_CONFIG -> 
classOf[ByteArrayDeserializer].getName,
+  AUTO_OFFSET_RESET_CONFIG -> "earliest",
+  ENABLE_AUTO_COMMIT_CONFIG -> "false"
+)
+
+val numThreads = 100
+val numConsumerUsages = 500
+
+@volatile var error: Throwable = null
+
+def consume(i: Int): Unit = {
+  val useCache = Random.nextBoolean
+  val taskContext = if (Random.nextBoolean) {
+new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), 
null, null, null)
+  } else {
+null
+  }
+  val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
+groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
+  try {
+val rcvd = 0 until data.length map { offset =>
--- End diff --

style -- just by convetion, ranges are an exception to the usual rule, they 
are wrapped with parens `(x until y).map`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180180011
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180174857
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180174985
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180174651
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180174410
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
--- End diff --

This generally means the class should be `private` not `private[blah]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180180433
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
--- End diff --

Given that all these variables are read / updated inside `synchronized` 
blocks, `@volatile` is unnecessary and a little misleading.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180173206
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180178237
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180177328
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180173382
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180179223
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180175661
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180174808
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180175754
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180172573
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
--- End diff --

nit: follow coding style for multi-line declarations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180175493
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180176691
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+"groupId used for cache key must match the groupId in kafkaParams")
+
+  @volatile private var consumer = createConsumer
+
+  /** indicates whether this consumer is in use or not */
+  @volatile var inUse = true
+
+  /** indicate whether this consumer is going to be stopped in the next 
release */
+  @volatile var markedForClose = false
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  @volatile private var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
+  @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET
+
+  override def toString: String = {
+"InternalKafkaConsumer(" +
+  s"hash=${Integer.toHexString(hashCode)}, " +
+  s"groupId=$groupId, " +
+  s"topicPartition=$topicPartition)"
+  }
+
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[K, V] = {
+val c = new KafkaConsumer[K, V](kafkaParams)
+val tps = new 

[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20997#discussion_r180172763
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
 ---
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+
+private[kafka010] sealed trait KafkaDataConsumer[K, V] {
+  /**
+   * Get the record for the given offset if available.
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.get(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Start a batch on a compacted topic
+   *
+   * @param offset the offset to fetch.
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = {
+internalConsumer.compactedStart(offset, pollTimeoutMs)
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   *
+   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
+   */
+  def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = {
+internalConsumer.compactedNext(pollTimeoutMs)
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   *
+   * @throws NoSuchElementException if no previous element
+   */
+  def compactedPrevious(): ConsumerRecord[K, V] = {
+internalConsumer.compactedPrevious()
+  }
+
+  /**
+   * Release this consumer from being further used. Depending on its 
implementation,
+   * this consumer will be either finalized, or reset for reuse later.
+   */
+  def release(): Unit
+
+  /** Reference to the internal implementation that this wrapper delegates 
to */
+  protected def internalConsumer: InternalKafkaConsumer[K, V]
+}
+
+
+/**
+ * A wrapper around Kafka's KafkaConsumer.
+ * This is not for direct use outside this file.
+ */
+private[kafka010]
+class InternalKafkaConsumer[K, V](
+  val groupId: String,
+  val topicPartition: TopicPartition,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
--- End diff --

Given this, is there any advantage in passing the group ID as a parameter?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20997: [SPARK-19185] [DSTREAMS] Avoid concurrent use of ...

2018-04-06 Thread gaborgsomogyi
GitHub user gaborgsomogyi opened a pull request:

https://github.com/apache/spark/pull/20997

[SPARK-19185] [DSTREAMS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer

## What changes were proposed in this pull request?

`CachedKafkaConsumer` in the project kafka-0-10 is designed to maintain a 
pool of KafkaConsumers that can be reused. However, it was built with the 
assumption there will be only one thread trying to read the same Kafka 
TopicPartition at the same time. This assumption is not true all the time and 
this can inadvertently lead to ConcurrentModificationException, or worse, 
silent reading of incorrect data.

Here is a better way to design this. The consumer pool should be smart 
enough to avoid concurrent use of a cached consumer. If there is another 
request for the same TopicPartition as a currently in-use consumer, the pool 
should automatically return a fresh consumer.

- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called `KafkaDataConsumer` is introduced to hide this difference 
from the users of the consumer so that the client code does not have to reason 
about whether to stop and release. They simply call `val consumer = 
KafkaDataConsumer.acquire` and then `consumer.release`.
- If there is request for a consumer that is in-use, then a new consumer is 
generated.
- If there is request for a consumer which is a task reattempt, then 
already existing cached consumers will be invalidated and a new consumer is 
generated. This could fix potential issues if the source of the reattempt is a 
malfunctioning consumer.
- In addition, I renamed the `CachedKafkaConsumer` class to 
`KafkaDataConsumer` because is a misnomer given that what it returns may or may 
not be cached.

## How was this patch tested?

A new stress test that verifies it is safe to concurrently get consumers 
for the same TopicPartition from the consumer pool.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaborgsomogyi/spark SPARK-19185

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20997


commit 0fe456b48d93ed24cc59446b79ccfb32694295bc
Author: Gabor Somogyi 
Date:   2018-03-20T03:04:04Z

[SPARK-19185] [DSTREAMS] Avoid concurrent use of cached consumers in 
CachedKafkaConsumer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org