This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a8aad0  [SPARK-27343][KAFKA][SS] Avoid hardcoded for 
spark-sql-kafka-0-10
5a8aad0 is described below

commit 5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d
Author: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
AuthorDate: Sun May 12 10:46:18 2019 -0500

    [SPARK-27343][KAFKA][SS] Avoid hardcoded for spark-sql-kafka-0-10
    
    ## What changes were proposed in this pull request?
    
    
[SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343)
    
    Based on the previous PR: https://github.com/apache/spark/pull/24270
    
    Extracting parameters , building the objects of ConfigEntry.
    
    For example:
    for the parameter "spark.kafka.producer.cache.timeout",we build
    ```
    private[kafka010] val PRODUCER_CACHE_TIMEOUT =
        ConfigBuilder("spark.kafka.producer.cache.timeout")
          .doc("The expire time to remove the unused producers.")
          .timeConf(TimeUnit.MILLISECONDS)
          .createWithDefaultString("10m")
    ```
    
    Closes #24574 from hehuiyuan/hehuiyuan-patch-9.
    
    Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../spark/sql/kafka010/CachedKafkaProducer.scala   |  7 +++---
 .../spark/sql/kafka010/KafkaContinuousStream.scala |  3 ++-
 .../spark/sql/kafka010/KafkaDataConsumer.scala     |  2 +-
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  5 ++--
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  3 ++-
 .../spark/sql/kafka010/KafkaOffsetReader.scala     |  4 ++--
 .../apache/spark/sql/kafka010/KafkaRelation.scala  |  2 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  4 ++--
 .../spark/sql/kafka010/KafkaSourceProvider.scala   | 27 ++++++++++++++--------
 .../org/apache/spark/sql/kafka010/package.scala    | 16 +++++++++++++
 10 files changed, 49 insertions(+), 24 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index 2bab287..ce22e3f 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -36,10 +36,9 @@ private[kafka010] object CachedKafkaProducer extends Logging 
{
 
   private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
 
-  private lazy val cacheExpireTimeout: Long =
-    Option(SparkEnv.get).map(_.conf.getTimeAsMs(
-      "spark.kafka.producer.cache.timeout",
-      s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout)
+  private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get)
+    .map(_.conf.get(PRODUCER_CACHE_TIMEOUT))
+    .getOrElse(defaultCacheExpireTimeout)
 
   private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
     override def load(config: Seq[(String, Object)]): Producer = {
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 92686d2..03f82a5 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -53,7 +53,8 @@ class KafkaContinuousStream(
     failOnDataLoss: Boolean)
   extends ContinuousStream with Logging {
 
-  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+  private val pollTimeoutMs =
+    sourceOptions.getOrElse(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, 
"512").toLong
 
   // Initialized when creating reader factories. If this diverges from the 
partitions at the latest
   // offsets, we need to reconfigure.
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 45ea3d2..cbb99fd 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -523,7 +523,7 @@ private[kafka010] object KafkaDataConsumer extends Logging {
   //   tasks simultaneously using consumers than the capacity.
   private lazy val cache = {
     val conf = SparkEnv.get.conf
-    val capacity = conf.getInt("spark.sql.kafkaConsumerCache.capacity", 64)
+    val capacity = conf.get(CONSUMER_CACHE_CAPACITY)
     new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, 
true) {
       override def removeEldestEntry(
         entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 76c7b5d..32d5f92 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -64,10 +64,11 @@ private[kafka010] class KafkaMicroBatchStream(
     failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
 
   private val pollTimeoutMs = options.getLong(
-    "kafkaConsumer.pollTimeoutMs",
+    KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
     SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L)
 
-  private val maxOffsetsPerTrigger = 
Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)
+  private val maxOffsetsPerTrigger = 
Option(options.get(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER))
+    .map(_.toLong)
 
   private val rangeCalculator = KafkaOffsetRangeCalculator(options)
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 1af8404..c188b4c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -92,7 +92,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val 
minPartitions: Option[Int
 private[kafka010] object KafkaOffsetRangeCalculator {
 
   def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
-    val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
+    val optionalValue = 
Option(options.get(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY))
+      .map(_.toInt)
     new KafkaOffsetRangeCalculator(optionalValue)
   }
 }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index c64b070..429cbeb 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -89,10 +89,10 @@ private[kafka010] class KafkaOffsetReader(
   }
 
   private val maxOffsetFetchAttempts =
-    readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+    readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, 
"3").toInt
 
   private val offsetFetchAttemptIntervalMs =
-    readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+    
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, 
"1000").toLong
 
   private def nextGroupId(): String = {
     groupId = driverGroupIdPrefix + "-" + nextId
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index 48cc089..d1ff96a 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -47,7 +47,7 @@ private[kafka010] class KafkaRelation(
     "Ending offset not allowed to be set to earliest offsets.")
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
-    "kafkaConsumer.pollTimeoutMs",
+    KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
     (sqlContext.sparkContext.conf.get(NETWORK_TIMEOUT) * 1000L).toString
   ).toLong
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index fa93e8f..037c01b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -85,12 +85,12 @@ private[kafka010] class KafkaSource(
   private val sc = sqlContext.sparkContext
 
   private val pollTimeoutMs = sourceOptions.getOrElse(
-    "kafkaConsumer.pollTimeoutMs",
+    KafkaSourceProvider.CONSUMER_POLL_TIMEOUT,
     (sc.conf.get(NETWORK_TIMEOUT) * 1000L).toString
   ).toLong
 
   private val maxOffsetsPerTrigger =
-    sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+    sourceOptions.get(KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER).map(_.toLong)
 
   /**
    * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index c27382d..dbd3310 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -185,11 +185,11 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
 
   private def strategy(caseInsensitiveParams: Map[String, String]) =
       caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get 
match {
-    case ("assign", value) =>
+    case (ASSIGN, value) =>
       AssignStrategy(JsonUtils.partitions(value))
-    case ("subscribe", value) =>
+    case (SUBSCRIBE, value) =>
       SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty))
-    case ("subscribepattern", value) =>
+    case (SUBSCRIBE_PATTERN, value) =>
       SubscribePatternStrategy(value.trim())
     case _ =>
       // Should never reach here as we are already matching on
@@ -217,22 +217,22 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     }
 
     val strategy = caseInsensitiveParams.find(x => 
STRATEGY_OPTION_KEYS.contains(x._1)).get match {
-      case ("assign", value) =>
+      case (ASSIGN, value) =>
         if (!value.trim.startsWith("{")) {
           throw new IllegalArgumentException(
             "No topicpartitions to assign as specified value for option " +
               s"'assign' is '$value'")
         }
 
-      case ("subscribe", value) =>
+      case (SUBSCRIBE, value) =>
         val topics = value.split(",").map(_.trim).filter(_.nonEmpty)
         if (topics.isEmpty) {
           throw new IllegalArgumentException(
             "No topics to subscribe to as specified value for option " +
               s"'subscribe' is '$value'")
         }
-      case ("subscribepattern", value) =>
-        val pattern = caseInsensitiveParams("subscribepattern").trim()
+      case (SUBSCRIBE_PATTERN, value) =>
+        val pattern = caseInsensitiveParams(SUBSCRIBE_PATTERN).trim()
         if (pattern.isEmpty) {
           throw new IllegalArgumentException(
             "Pattern to subscribe is empty as specified value for option " +
@@ -348,7 +348,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     validateGeneralOptions(caseInsensitiveParams)
 
     // Don't want to throw an error, but at least log a warning.
-    if (caseInsensitiveParams.get("maxoffsetspertrigger").isDefined) {
+    if 
(caseInsensitiveParams.get(MAX_OFFSET_PER_TRIGGER.toLowerCase(Locale.ROOT)).isDefined)
 {
       logWarning("maxOffsetsPerTrigger option ignored in batch queries")
     }
   }
@@ -458,11 +458,18 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
 }
 
 private[kafka010] object KafkaSourceProvider extends Logging {
-  private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", 
"assign")
+  private val ASSIGN = "assign"
+  private val SUBSCRIBE_PATTERN = "subscribepattern"
+  private val SUBSCRIBE = "subscribe"
+  private val STRATEGY_OPTION_KEYS = Set(SUBSCRIBE, SUBSCRIBE_PATTERN, ASSIGN)
   private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
-  private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+  private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
+  private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxOffsetsPerTrigger"
+  private[kafka010] val FETCH_OFFSET_NUM_RETRY = "fetchOffset.numRetries"
+  private[kafka010] val FETCH_OFFSET_RETRY_INTERVAL_MS = 
"fetchOffset.retryIntervalMs"
+  private[kafka010] val CONSUMER_POLL_TIMEOUT = "kafkaConsumer.pollTimeoutMs"
   private val GROUP_ID_PREFIX = "groupidprefix"
 
   val TOPIC_OPTION_KEY = "topic"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
index 43acd6a..115ec44 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/package.scala
@@ -16,9 +16,25 @@
  */
 package org.apache.spark.sql
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.kafka.common.TopicPartition
 
+import org.apache.spark.internal.config.ConfigBuilder
+
 package object kafka010 {   // scalastyle:ignore
   // ^^ scalastyle:ignore is for ignoring warnings about digits in package name
   type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private[kafka010] val PRODUCER_CACHE_TIMEOUT =
+    ConfigBuilder("spark.kafka.producer.cache.timeout")
+      .doc("The expire time to remove the unused producers.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10m")
+
+  private[kafka010] val CONSUMER_CACHE_CAPACITY =
+    ConfigBuilder("spark.sql.kafkaConsumerCache.capacity")
+      .doc("The size of consumers cached.")
+      .intConf
+      .createWithDefault(64)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to