[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-21 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r219418553
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-20 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r219367280
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218955883
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218777053
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218725548
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218719952
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218709909
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r218699852
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215867141
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215818860
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215637613
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215635068
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215594790
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215591546
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215583862
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215579562
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215313888
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215313215
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215310881
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215310403
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
--- End diff --

Ah yes, wasn't aware of PrivateMethodTester. Thanks! Will apply.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215308701
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215281569
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
+  }
+}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
+  }
+
+  startEvictorThread()
+
+  def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = 
synchronized {
+val fetchedDataList = cache.getOrElseUpdate(key, new 
CachedFetchedDataList())
+
+val cachedFetchedDataOption = fetchedDataList.find { p =>
+  !p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset
+}
+
+var cachedFetchedData: CachedFetchedData = null
+if (cachedFetchedDataOption.isDefined) {
+  cachedFetchedData = cachedFetchedDataOption.get
+} else {
+  cachedFetchedData = CachedFetchedData.empty()
+  fetchedDataList += cachedFetchedData
+}
+
+cachedFetchedData.lastAcquiredTimestamp = System.currentTimeMillis()
+cachedFetchedData.inUse = true
+
+cachedFetchedData.getObject
+  }
+
+  def invalidate(key: CacheKey): Unit = synchronized {
+cache.remove(key)
+  }
+
+  def release(key: CacheKey, fetchedData: FetchedData): Unit = 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215277752
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215275456
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215274150
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215274047
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215270783
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r215268638
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
--- End diff --

Then `PrivateMethodTester` can be used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214918569
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214913221
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214916741
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214916493
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917536
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
--- End diff --

Nice catch! Will rename.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214911381
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
--- End diff --

This is to make sure `cache` itself is not accessible from outside, and 
when callers access `cache` via `getCache`, they will be noted it should not be 
used other than testing from scaladoc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910337
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
--- End diff --

Nice catch! Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917284
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
--- End diff --

This is defined as `var` so just to avoid additional wrapping here. Same 
here as above: if we prefer Option I'm happy to change but not sure about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910482
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910433
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214907878
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214908731
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917336
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
+  consumer = consumerPool.borrowObject(cacheKey, kafkaParams)
+}
+  }
+
+  private def ensureFetchedDataAvailable(offset: Long): Unit = {
+if (fetchedData == null) {
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214909826
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
+  }
+}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
+  }
+
+  startEvictorThread()
+
+  def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = 
synchronized {
+val fetchedDataList = cache.getOrElseUpdate(key, new 
CachedFetchedDataList())
+
+val cachedFetchedDataOption = fetchedDataList.find { p =>
+  !p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset
+}
+
+var cachedFetchedData: CachedFetchedData = null
+if (cachedFetchedDataOption.isDefined) {
+  cachedFetchedData = cachedFetchedDataOption.get
+} else {
+  cachedFetchedData = CachedFetchedData.empty()
+  fetchedDataList += cachedFetchedData
+}
+
+cachedFetchedData.lastAcquiredTimestamp = System.currentTimeMillis()
+cachedFetchedData.inUse = true
+
+cachedFetchedData.getObject
+  }
+
+  def invalidate(key: CacheKey): Unit = synchronized {
+cache.remove(key)
+  }
+
+  def release(key: CacheKey, fetchedData: FetchedData): Unit = 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214853362
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214815260
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214716582
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
--- End diff --

Why is this better then:
`private[kafka010] val cache: mutable.Map[CacheKey, CachedFetchedDataList] 
= mutable.HashMap.empty`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214721690
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
--- End diff --

Any thrown exception or error reaching the executor causes the executor to 
halt.
`catch` + `log...` would be good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214813543
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214705613
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214797928
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214800822
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214806388
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
--- End diff --

Shouldn't it be `FetchedDataPoolSuite.scala`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214716234
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
--- End diff --

Nit: `kafka-fetched-data-cache-evictor`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214803892
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
+  consumer = consumerPool.borrowObject(cacheKey, kafkaParams)
+}
+  }
+
+  private def ensureFetchedDataAvailable(offset: Long): Unit = {
+if (fetchedData == null) {
--- End diff --

Why not using option?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214717195
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
--- End diff --

Nit: `executorService.scheduleAtFixedRate(new Runnable {` is enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214803861
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
--- End diff --

Why not using option?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214817471
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
+  }
+}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
+  }
+
+  startEvictorThread()
+
+  def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = 
synchronized {
+val fetchedDataList = cache.getOrElseUpdate(key, new 
CachedFetchedDataList())
+
+val cachedFetchedDataOption = fetchedDataList.find { p =>
+  !p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset
+}
+
+var cachedFetchedData: CachedFetchedData = null
+if (cachedFetchedDataOption.isDefined) {
+  cachedFetchedData = cachedFetchedDataOption.get
+} else {
+  cachedFetchedData = CachedFetchedData.empty()
+  fetchedDataList += cachedFetchedData
+}
+
+cachedFetchedData.lastAcquiredTimestamp = System.currentTimeMillis()
+cachedFetchedData.inUse = true
+
+cachedFetchedData.getObject
+  }
+
+  def invalidate(key: CacheKey): Unit = synchronized {
+cache.remove(key)
+  }
+
+  def release(key: CacheKey, fetchedData: FetchedData): Unit = 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213871197
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
--- End diff --

That sounds good, but let's wait for voices on committers since CacheKey is 
designed before, not introduced in this patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213869998
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
--- End diff --

Oh I see. What about changing CacheKey to not a case class but to a class 
where kafkaParams is a member but its equals and hashCode methods does not use 
kafkaParams?
As this values are goes together I have the feeling encapsulating them is 
better then keeping their relation in a separate map (keyToKafkaParams). It is 
just an idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213867027
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
+// set invalidate timestamp to 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213866613
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
+// set invalidate timestamp to 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213866495
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
--- End diff --

My bad. Will 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213866399
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
--- End diff --

It is to reduce unnecessary computation along with comparing map while 
accessing with pool. You can see CacheKey keeps as it is, and I guess CacheKey 
was designed for same reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213621710
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
+// set invalidate timestamp to 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213639419
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
+// set invalidate timestamp to 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213615553
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(intConsumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(intConsumer), intConsumer)
+  }
+
+  /**
+   * Invalidates current idle and active (borrowed) objects for the key. 
It ensure no invalidated
+   * object will be provided again via borrowObject.
+   *
+   * It doesn't mean the key will not be available: valid objects will be 
available via calling
+   * borrowObject afterwards.
+   */
+  def invalidateKey(key: CacheKey): Unit = {
+// invalidate all idle consumers for the key
+pool.clear(key)
+
+pool.getNumActive()
--- End diff --

Is this call 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-29 Thread attilapiros
Github user attilapiros commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r213615086
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer._
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value, and
+ * the pool will have reasonable default value if the value is not 
provided.
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(objectFactory: 
ObjectFactory,
+  poolConfig: PoolConfig) {
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
capacity, pool will try
+   * to clear some of idle objects. If it doesn't help getting empty space 
to create new object,
+   * it will throw [[NoSuchElementException]] immediately.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
--- End diff --

Why kafkaParams is passed as the second argument? 
As I see CacheKey itself is constructed from kafkaParams so would not be 
better to store kafkaParam in a val within CacheKey? 

Then objectFactory.keyToKafkaParams would be deleted along with 
updateKafkaParamForKey. Is not it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-19 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r29914
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -425,70 +381,36 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
   def acquire(
   topicPartition: TopicPartition,
   kafkaParams: ju.Map[String, Object],
-  useCache: Boolean): KafkaDataConsumer = synchronized {
-val key = new CacheKey(topicPartition, kafkaParams)
-val existingInternalConsumer = cache.get(key)
+  useCache: Boolean): KafkaDataConsumer = {
 
-lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+if (!useCache) {
+  return NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
+}
 
-if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  // If this is reattempt at running the task, then invalidate cached 
consumer if any and
-  // start with a new one.
-  if (existingInternalConsumer != null) {
-// Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
-if (existingInternalConsumer.inUse) {
-  existingInternalConsumer.markedForClose = true
-} else {
-  existingInternalConsumer.close()
-}
-  }
-  cache.remove(key)  // Invalidate the cache in any case
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+val key = new CacheKey(topicPartition, kafkaParams)
 
-} else if (!useCache) {
-  // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
+  // If this is reattempt at running the task, then invalidate cached 
consumer if any.
 
-} else if (existingInternalConsumer == null) {
-  // If consumer is not already cached, then put a new in the cache 
and return it
-  cache.put(key, newInternalConsumer)
-  newInternalConsumer.inUse = true
-  CachedKafkaDataConsumer(newInternalConsumer)
+  // invalidate all idle consumers for the key
+  pool.invalidateKey(key)
 
-} else if (existingInternalConsumer.inUse) {
-  // If consumer is already cached but is currently in use, then 
return a new consumer
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+  // borrow a consumer from pool even in this case
--- End diff --

This is another behavior change: If this attempt succeeds, we can use 
pooled consumer from next batch, so no reason to discard the consumer.

But I also see the cost of unnecessary pooling if failure occurs 
continuously.

So that looks like kind of decision between possibility of success vs 
possibility of failure (again), and while I decide to cache it, it is pretty 
easy to go back to current behavior, so please let me know if we think current 
behavior makes more sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-17 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r211053868
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -425,70 +381,36 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
   def acquire(
   topicPartition: TopicPartition,
   kafkaParams: ju.Map[String, Object],
-  useCache: Boolean): KafkaDataConsumer = synchronized {
-val key = new CacheKey(topicPartition, kafkaParams)
-val existingInternalConsumer = cache.get(key)
+  useCache: Boolean): KafkaDataConsumer = {
 
-lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+if (!useCache) {
+  return NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
+}
 
-if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  // If this is reattempt at running the task, then invalidate cached 
consumer if any and
-  // start with a new one.
-  if (existingInternalConsumer != null) {
-// Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
-if (existingInternalConsumer.inUse) {
-  existingInternalConsumer.markedForClose = true
-} else {
-  existingInternalConsumer.close()
-}
-  }
-  cache.remove(key)  // Invalidate the cache in any case
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+val key = new CacheKey(topicPartition, kafkaParams)
 
-} else if (!useCache) {
-  // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
+  // If this is reattempt at running the task, then invalidate cached 
consumer if any.
 
-} else if (existingInternalConsumer == null) {
-  // If consumer is not already cached, then put a new in the cache 
and return it
-  cache.put(key, newInternalConsumer)
-  newInternalConsumer.inUse = true
-  CachedKafkaDataConsumer(newInternalConsumer)
+  // invalidate all idle consumers for the key
+  pool.invalidateKey(key)
 
-} else if (existingInternalConsumer.inUse) {
-  // If consumer is already cached but is currently in use, then 
return a new consumer
-  NonCachedKafkaDataConsumer(newInternalConsumer)
+  // borrow a consumer from pool even in this case
+}
 
-} else {
-  // If consumer is already cached and is currently not in use, then 
return that consumer
-  existingInternalConsumer.inUse = true
-  CachedKafkaDataConsumer(existingInternalConsumer)
+try {
+  CachedKafkaDataConsumer(pool.borrowObject(key, kafkaParams))
+} catch { case _: NoSuchElementException =>
+  // There's neither idle object to clean up nor available space in 
pool:
+  // fail back to create non-cached consumer
--- End diff --

This approach introduces behavior change: even though `cache` had capacity, 
the `cache` worked like soft capacity and allowed adding item to the cache when 
there's neither idle object nor free space. 

New behavior of the KafkaDataConsumer is creating all the objects to 
non-cached whenever pool is exhausted and there's no idle object to free up.

I think it is not a big deal when we configure 
"spark.sql.kafkaConsumerCache.capacity" properly, and having hard capacity 
feels more convenient to determine what's going on.

However we can still mimic the current behavior with having infinite 
capacity, so we can be back to current behavior if we feel it makes more sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-08-17 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/22138

[SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer

## What changes were proposed in this pull request?

KafkaDataConsumer contains its own logic for caching InternalKafkaConsumer 
which looks like can be simplified via applying Apache Commons Pool. Benefits 
of applying Apache Commons Pool are following:

* We can get rid of synchronization of KafkaDataConsumer object while 
acquiring and returning InternalKafkaConsumer.
* We can extract the feature of object pool to outside of the class, so 
that the behaviors of the pool can be tested easily.
* We can get various statistics for the object pool, and also be able to 
enable JMX for the pool.

This patch brings additional dependency, Apache Commons Pool 2.6.0 into 
`spark-sql-kafka-0-10` module.

## How was this patch tested?

Existing unit tests as well as new tests for object pool.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-25151

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22138


commit c82f3064fa8744f91b5c8a92645588dc9d53ba35
Author: Jungtaek Lim 
Date:   2018-08-17T09:56:31Z

[SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org