This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f55809  [SPARK-27294][SS] Add multi-cluster Kafka delegation token
2f55809 is described below

commit 2f558094257c38d26650049f2ac93be6d65d6d85
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Tue May 7 11:40:43 2019 -0700

    [SPARK-27294][SS] Add multi-cluster Kafka delegation token
    
    ## What changes were proposed in this pull request?
    
    The actual implementation doesn't support multi-cluster Kafka connection 
with delegation token. In this PR I've added this functionality.
    
    What this PR contains:
    * New way of configuration
    * Multiple delegation token obtain/store/use functionality
    * Documentation
    * The change works on DStreams also
    
    ## How was this patch tested?
    
    Existing + additional unit tests.
    Additionally tested on cluster.
    
    Test scenario:
    
    * 2 * 4 node clusters
    * The 4-4 nodes are in different kerberos realms
    * Cross-Realm trust between the 2 realms
    * Yarn
    * Kafka broker version 2.1.0
    * security.protocol = SASL_SSL
    * sasl.mechanism = SCRAM-SHA-512
    * Artificial exceptions during processing
    * Source reads from realm1 sink writes to realm2
    
    Kafka broker settings:
    
    * delegation.token.expiry.time.ms=600000 (10 min)
    * delegation.token.max.lifetime.ms=1200000 (20 min)
    * delegation.token.expiry.check.interval.ms=300000 (5 min)
    
    Closes #24305 from gaborgsomogyi/SPARK-27294.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/internal/config/Kafka.scala   |  91 ---------------
 docs/structured-streaming-kafka-integration.md     | 103 ++++++++++++++++-
 .../spark/sql/kafka010/CachedKafkaProducer.scala   |  14 +--
 .../spark/sql/kafka010/ConsumerStrategy.scala      |  10 +-
 .../spark/sql/kafka010/KafkaDataConsumer.scala     |   2 +-
 .../apache/spark/kafka010/KafkaConfigUpdater.scala |  21 ++--
 .../kafka010/KafkaDelegationTokenProvider.scala    |  47 ++++++--
 .../spark/kafka010/KafkaTokenSparkConf.scala       |  96 ++++++++++++++++
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala | 102 ++++++++++------
 .../spark/kafka010/KafkaConfigUpdaterSuite.scala   |  45 ++++++--
 .../spark/kafka010/KafkaDelegationTokenTest.scala  |   7 +-
 .../spark/kafka010/KafkaTokenSparkConfSuite.scala  | 128 +++++++++++++++++++++
 .../spark/kafka010/KafkaTokenUtilSuite.scala       | 123 ++++++++++++--------
 external/kafka-0-10/pom.xml                        |   5 +
 .../kafka010/KafkaDataConsumerSuite.scala          |  10 +-
 15 files changed, 573 insertions(+), 231 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
deleted file mode 100644
index e91ddd3..0000000
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.internal.config
-
-private[spark] object Kafka {
-
-  val BOOTSTRAP_SERVERS =
-    ConfigBuilder("spark.kafka.bootstrap.servers")
-      .doc("A list of coma separated host/port pairs to use for establishing 
the initial " +
-        "connection to the Kafka cluster. For further details please see kafka 
documentation. " +
-        "Only used to obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val SECURITY_PROTOCOL =
-    ConfigBuilder("spark.kafka.security.protocol")
-      .doc("Protocol used to communicate with brokers. For further details 
please see kafka " +
-        "documentation. Only used to obtain delegation token.")
-      .stringConf
-      .createWithDefault("SASL_SSL")
-
-  val KERBEROS_SERVICE_NAME =
-    ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
-      .doc("The Kerberos principal name that Kafka runs as. This can be 
defined either in " +
-        "Kafka's JAAS config or in Kafka's config. For further details please 
see kafka " +
-        "documentation. Only used to obtain delegation token.")
-      .stringConf
-      .createWithDefault("kafka")
-
-  val TRUSTSTORE_LOCATION =
-    ConfigBuilder("spark.kafka.ssl.truststore.location")
-      .doc("The location of the trust store file. For further details please 
see kafka " +
-        "documentation. Only used to obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val TRUSTSTORE_PASSWORD =
-    ConfigBuilder("spark.kafka.ssl.truststore.password")
-      .doc("The store password for the trust store file. This is optional for 
client and only " +
-        "needed if ssl.truststore.location is configured. For further details 
please see kafka " +
-        "documentation. Only used to obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val KEYSTORE_LOCATION =
-    ConfigBuilder("spark.kafka.ssl.keystore.location")
-      .doc("The location of the key store file. This is optional for client 
and can be used for " +
-        "two-way authentication for client. For further details please see 
kafka documentation. " +
-        "Only used to obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val KEYSTORE_PASSWORD =
-    ConfigBuilder("spark.kafka.ssl.keystore.password")
-      .doc("The store password for the key store file. This is optional for 
client and only " +
-        "needed if ssl.keystore.location is configured. For further details 
please see kafka " +
-        "documentation. Only used to obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val KEY_PASSWORD =
-    ConfigBuilder("spark.kafka.ssl.key.password")
-      .doc("The password of the private key in the key store file. This is 
optional for client. " +
-        "For further details please see kafka documentation. Only used to 
obtain delegation token.")
-      .stringConf
-      .createOptional
-
-  val TOKEN_SASL_MECHANISM =
-    ConfigBuilder("spark.kafka.sasl.token.mechanism")
-      .doc("SASL mechanism used for client connections with delegation token. 
Because SCRAM " +
-        "login module used for authentication a compatible mechanism has to be 
set here. " +
-        "For further details please see kafka documentation (sasl.mechanism). 
Only used to " +
-        "authenticate against Kafka broker with delegation token.")
-      .stringConf
-      .createWithDefault("SCRAM-SHA-512")
-}
diff --git a/docs/structured-streaming-kafka-integration.md 
b/docs/structured-streaming-kafka-integration.md
index 9e1bbc0..4a295e0 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -676,7 +676,7 @@ This way the application can be configured via Spark 
parameters and may not need
 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For 
further information
 about delegation tokens, see [Kafka delegation token 
docs](http://kafka.apache.org/documentation/#security_delegation_token).
 
-The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.bootstrap.servers` is set,
+The process is initiated by Spark's Kafka delegation token provider. When 
`spark.kafka.clusters.${cluster}.auth.bootstrap.servers` is set,
 Spark considers the following log in options, in order of preference:
 - **JAAS login configuration**, please see example below.
 - **Keytab file**, such as,
@@ -684,13 +684,13 @@ Spark considers the following log in options, in order of 
preference:
       ./bin/spark-submit \
           --keytab <KEYTAB_FILE> \
           --principal <PRINCIPAL> \
-          --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+          --conf 
spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
           ...
 
 - **Kerberos credential cache**, such as,
 
       ./bin/spark-submit \
-          --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
+          --conf 
spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
           ...
 
 The Kafka delegation token provider can be turned off by setting 
`spark.security.credentials.kafka.enabled` to `false` (default: `true`).
@@ -703,10 +703,103 @@ Kafka broker configuration):
 
 After obtaining delegation token successfully, Spark distributes it across 
nodes and renews it accordingly.
 Delegation token uses `SCRAM` login module for authentication and because of 
that the appropriate
-`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be 
configured. Also, this parameter
+`spark.kafka.clusters.${cluster}.sasl.token.mechanism` (default: 
`SCRAM-SHA-512`) has to be configured. Also, this parameter
 must match with Kafka broker configuration.
 
-When delegation token is available on an executor it can be overridden with 
JAAS login configuration.
+When delegation token is available on an executor Spark considers the 
following log in options, in order of preference:
+- **JAAS login configuration**, please see example below.
+- **Delegation token**, please see 
<code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code> 
parameter for further details.
+
+When none of the above applies then unsecure connection assumed.
+
+
+#### Configuration
+
+Delegation tokens can be obtained from multiple clusters and 
<code>${cluster}</code> is an arbitrary unique identifier which helps to group 
different configurations.
+
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+  <tr>
+    
<td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
+    <td>None</td>
+    <td>
+      A list of coma separated host/port pairs to use for establishing the 
initial connection
+      to the Kafka cluster. For further details please see Kafka 
documentation. Only used to obtain delegation token.
+    </td>
+  </tr>
+  <tr>
+    
<td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
+    <td>.*</td>
+    <td>
+      Regular expression to match against the <code>bootstrap.servers</code> 
config for sources and sinks in the application.
+      If a server address matches this regex, the delegation token obtained 
from the respective bootstrap servers will be used when connecting.
+      If multiple clusters match the address, an exception will be thrown and 
the query won't be started.
+      Kafka's secure and unsecure listeners are bound to different ports. When 
both used the secure listener port has to be part of the regular expression.
+    </td>
+  </tr>
+  <tr>
+    <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
+    <td>SASL_SSL</td>
+    <td>
+      Protocol used to communicate with brokers. For further details please 
see Kafka documentation. Only used to obtain delegation token.
+    </td>
+  </tr>
+  <tr>
+    
<td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
+    <td>kafka</td>
+    <td>
+      The Kerberos principal name that Kafka runs as. This can be defined 
either in Kafka's JAAS config or in Kafka's config.
+      For further details please see Kafka documentation. Only used to obtain 
delegation token.
+    </td>
+  </tr>
+  <tr>
+    
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
+    <td>None</td>
+    <td>
+      The location of the trust store file. For further details please see 
Kafka documentation. Only used to obtain delegation token.
+    </td>
+  </tr>
+  <tr>
+    
<td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
+    <td>None</td>
+    <td>
+      The store password for the trust store file. This is optional and only 
needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> 
is configured.
+      For further details please see Kafka documentation. Only used to obtain 
delegation token.
+    </td>
+  </tr>
+  <tr>
+    <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
+    <td>None</td>
+    <td>
+      The location of the key store file. This is optional for client and can 
be used for two-way authentication for client.
+      For further details please see Kafka documentation. Only used to obtain 
delegation token.
+    </td>
+  </tr>
+  <tr>
+    <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
+    <td>None</td>
+    <td>
+      The store password for the key store file. This is optional and only 
needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is 
configured.
+      For further details please see Kafka documentation. Only used to obtain 
delegation token.
+    </td>
+  </tr>
+  <tr>
+    <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
+    <td>None</td>
+    <td>
+      The password of the private key in the key store file. This is optional 
for client.
+      For further details please see Kafka documentation. Only used to obtain 
delegation token.
+    </td>
+  </tr>
+  <tr>
+    <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
+    <td>SCRAM-SHA-512</td>
+    <td>
+      SASL mechanism used for client connections with delegation token. 
Because SCRAM login module used for authentication a compatible mechanism has 
to be set here.
+      For further details please see Kafka documentation 
(<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker 
with delegation token.
+    </td>
+  </tr>
+</table>
 
 #### Caveats
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index 062ce9a..2bab287 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -65,12 +65,8 @@ private[kafka010] object CachedKafkaProducer extends Logging 
{
       .build[Seq[(String, Object)], Producer](cacheLoader)
 
   private def createKafkaProducer(producerConfiguration: ju.Map[String, 
Object]): Producer = {
-    val updatedKafkaProducerConfiguration =
-      KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap)
-        .setAuthenticationConfigIfNeeded()
-        .build()
-    val kafkaProducer: Producer = new 
Producer(updatedKafkaProducerConfiguration)
-    logDebug(s"Created a new instance of KafkaProducer for 
$updatedKafkaProducerConfiguration.")
+    val kafkaProducer: Producer = new Producer(producerConfiguration)
+    logDebug(s"Created a new instance of KafkaProducer for 
$producerConfiguration.")
     kafkaProducer
   }
 
@@ -80,7 +76,11 @@ private[kafka010] object CachedKafkaProducer extends Logging 
{
    * one instance per specified kafkaParams.
    */
   private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): 
Producer = {
-    val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
+    val updatedKafkaProducerConfiguration =
+      KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
+        .setAuthenticationConfigIfNeeded()
+        .build()
+    val paramsSeq: Seq[(String, Object)] = 
paramsToSeq(updatedKafkaProducerConfiguration)
     try {
       guavaCache.get(paramsSeq)
     } catch {
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index 2326619..7bb829c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -36,7 +36,7 @@ import org.apache.spark.kafka010.KafkaConfigUpdater
  * All three strategies have overloaded constructors that allow you to specify
  * the starting offset for a particular partition.
  */
-sealed trait ConsumerStrategy {
+private[kafka010] sealed trait ConsumerStrategy {
   /** Create a [[KafkaConsumer]] and subscribe to topics according to a 
desired strategy */
   def createConsumer(kafkaParams: ju.Map[String, Object]): 
Consumer[Array[Byte], Array[Byte]]
 
@@ -53,7 +53,8 @@ sealed trait ConsumerStrategy {
 /**
  * Specify a fixed collection of partitions.
  */
-case class AssignStrategy(partitions: Array[TopicPartition]) extends 
ConsumerStrategy {
+private[kafka010] case class AssignStrategy(partitions: Array[TopicPartition])
+    extends ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
     val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
@@ -68,7 +69,7 @@ case class AssignStrategy(partitions: Array[TopicPartition]) 
extends ConsumerStr
 /**
  * Subscribe to a fixed collection of topics.
  */
-case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
+private[kafka010] case class SubscribeStrategy(topics: Seq[String]) extends 
ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
     val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
@@ -83,7 +84,8 @@ case class SubscribeStrategy(topics: Seq[String]) extends 
ConsumerStrategy {
 /**
  * Use a regex to specify topics of interest.
  */
-case class SubscribePatternStrategy(topicPattern: String) extends 
ConsumerStrategy {
+private[kafka010] case class SubscribePatternStrategy(topicPattern: String)
+    extends ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
     val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 83bf4b1..45ea3d2 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -347,7 +347,7 @@ private[kafka010] case class InternalKafkaConsumer(
    * consumer's `isolation.level` is `read_committed`), it will return a 
`FetchedRecord` with the
    * next offset to fetch.
    *
-   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is true`, it will
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is `true`, it will
    * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
    * method will return `null` if the next available record is within [offset, 
untilOffset).
    *
diff --git 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
index d24eb4a..38f3b98 100644
--- 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
+++ 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
@@ -21,11 +21,11 @@ import java.{util => ju}
 
 import scala.collection.JavaConverters._
 
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.Kafka
 
 /**
  * Class to conveniently update Kafka config params, while logging the changes
@@ -57,14 +57,17 @@ private[spark] case class KafkaConfigUpdater(module: 
String, kafkaParams: Map[St
     //   configuration.
     if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
       logDebug("JVM global security configuration detected, using it for 
login.")
-    } else if (KafkaTokenUtil.isTokenAvailable()) {
-      logDebug("Delegation token detected, using it for login.")
-      val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf)
-      set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
-      val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
-      require(mechanism.startsWith("SCRAM"),
-        "Delegation token works only with SCRAM mechanism.")
-      set(SaslConfigs.SASL_MECHANISM, mechanism)
+    } else {
+      val clusterConfig = KafkaTokenUtil.findMatchingToken(SparkEnv.get.conf,
+        
map.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG).asInstanceOf[String])
+      clusterConfig.foreach { clusterConf =>
+        logDebug("Delegation token detected, using it for login.")
+        val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
+        set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+        require(clusterConf.tokenMechanism.startsWith("SCRAM"),
+          "Delegation token works only with SCRAM mechanism.")
+        set(SaslConfigs.SASL_MECHANISM, clusterConf.tokenMechanism)
+      }
     }
     this
   }
diff --git 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
index da12e51..69fcf55 100644
--- 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
+++ 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
@@ -25,7 +25,6 @@ import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.Kafka
 import org.apache.spark.security.HadoopDelegationTokenProvider
 
 private[spark] class KafkaDelegationTokenProvider
@@ -37,25 +36,49 @@ private[spark] class KafkaDelegationTokenProvider
       hadoopConf: Configuration,
       sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
+    var lowestNextRenewalDate: Option[Long] = None
     try {
-      logDebug("Attempting to fetch Kafka security token.")
-      val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
-      creds.addToken(token.getService, token)
-      return Some(nextRenewalDate)
+      KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).foreach { 
clusterConf =>
+        try {
+          if (delegationTokensRequired(clusterConf)) {
+            logDebug(
+              s"Attempting to fetch Kafka security token for cluster 
${clusterConf.identifier}.")
+            val (token, nextRenewalDate) = 
KafkaTokenUtil.obtainToken(sparkConf, clusterConf)
+            creds.addToken(token.getService, token)
+            if (lowestNextRenewalDate.isEmpty || nextRenewalDate < 
lowestNextRenewalDate.get) {
+              lowestNextRenewalDate = Some(nextRenewalDate)
+            }
+          } else {
+            logDebug(
+              s"Cluster ${clusterConf.identifier} does not require delegation 
token, skipping.")
+          }
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"Failed to get token from service: $serviceName " +
+              s"cluster: ${clusterConf.identifier}", e)
+        }
+      }
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to get token from service $serviceName", e)
+        logWarning(s"Failed to get token cluster configuration", e)
     }
-    None
+    lowestNextRenewalDate
   }
 
   override def delegationTokensRequired(
       sparkConf: SparkConf,
       hadoopConf: Configuration): Boolean = {
-    val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
-    sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) &&
-      (protocol == SASL_SSL.name ||
-        protocol == SSL.name ||
-        protocol == SASL_PLAINTEXT.name)
+    try {
+      
KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).exists(delegationTokensRequired(_))
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to get token cluster configuration", e)
+        false
+    }
   }
+
+  private def delegationTokensRequired(clusterConf: KafkaTokenClusterConf): 
Boolean =
+    clusterConf.securityProtocol == SASL_SSL.name ||
+      clusterConf.securityProtocol == SSL.name ||
+      clusterConf.securityProtocol == SASL_PLAINTEXT.name
 }
diff --git 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
new file mode 100644
index 0000000..84d58d8
--- /dev/null
+++ 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kafka010
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
+import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+private[spark] case class KafkaTokenClusterConf(
+    identifier: String,
+    authBootstrapServers: String,
+    targetServersRegex: String,
+    securityProtocol: String,
+    kerberosServiceName: String,
+    trustStoreLocation: Option[String],
+    trustStorePassword: Option[String],
+    keyStoreLocation: Option[String],
+    keyStorePassword: Option[String],
+    keyPassword: Option[String],
+    tokenMechanism: String) {
+  override def toString: String = s"KafkaTokenClusterConf{" +
+    s"identifier=$identifier, " +
+    s"authBootstrapServers=$authBootstrapServers, " +
+    s"targetServersRegex=$targetServersRegex, " +
+    s"securityProtocol=$securityProtocol, " +
+    s"kerberosServiceName=$kerberosServiceName, " +
+    s"trustStoreLocation=$trustStoreLocation, " +
+    s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+    s"keyStoreLocation=$keyStoreLocation, " +
+    s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+    s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
+    s"tokenMechanism=$tokenMechanism}"
+}
+
+private [kafka010] object KafkaTokenSparkConf extends Logging {
+  val CLUSTERS_CONFIG_PREFIX = "spark.kafka.clusters."
+  val DEFAULT_TARGET_SERVERS_REGEX = ".*"
+  val DEFAULT_SASL_KERBEROS_SERVICE_NAME = "kafka"
+  val DEFAULT_SASL_TOKEN_MECHANISM = "SCRAM-SHA-512"
+
+  def getClusterConfig(sparkConf: SparkConf, identifier: String): 
KafkaTokenClusterConf = {
+    val configPrefix = s"$CLUSTERS_CONFIG_PREFIX$identifier."
+    val sparkClusterConf = sparkConf.getAllWithPrefix(configPrefix).toMap
+    val result = KafkaTokenClusterConf(
+      identifier,
+      sparkClusterConf
+        .getOrElse(s"auth.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}",
+          throw new NoSuchElementException(
+            
s"${configPrefix}auth.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}")),
+      
sparkClusterConf.getOrElse(s"target.${CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG}.regex",
+        KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX),
+      sparkClusterConf.getOrElse(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SASL_SSL.name),
+      sparkClusterConf.getOrElse(SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
+        KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME),
+      sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+      sparkClusterConf.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG),
+      sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+      sparkClusterConf.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+      sparkClusterConf.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
+      sparkClusterConf.getOrElse("sasl.token.mechanism",
+        KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+    )
+    logDebug(s"getClusterConfig($identifier): $result")
+    result
+  }
+
+  def getAllClusterConfigs(sparkConf: SparkConf): Set[KafkaTokenClusterConf] = 
{
+    
sparkConf.getAllWithPrefix(KafkaTokenSparkConf.CLUSTERS_CONFIG_PREFIX).toMap.keySet
+      .flatMap { k =>
+        val split = k.split('.')
+        if (split.length > 0 && split(0).nonEmpty) {
+          Some(split(0))
+        } else {
+          None
+        }
+      }.map(getClusterConfig(sparkConf, _))
+  }
+}
diff --git 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 3f9a593..7078b4f 100644
--- 
a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ 
b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -19,7 +19,9 @@ package org.apache.spark.kafka010
 
 import java.{util => ju}
 import java.text.SimpleDateFormat
+import java.util.regex.Pattern
 
+import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.io.Text
@@ -38,20 +40,28 @@ import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
 
 private[spark] object KafkaTokenUtil extends Logging {
   val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
-  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+  private val TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token"
+
+  private[kafka010] def getTokenService(identifier: String): Text =
+    new Text(s"$TOKEN_SERVICE_PREFIX.$identifier")
+
+  private def getClusterIdentifier(service: Text): String =
+    service.toString().replace(s"$TOKEN_SERVICE_PREFIX.", "")
 
   private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
     override def getKind: Text = TOKEN_KIND
   }
 
-  private[kafka010] def obtainToken(sparkConf: SparkConf):
-      (Token[KafkaDelegationTokenIdentifier], Long) = {
+  private[kafka010] def obtainToken(
+      sparkConf: SparkConf,
+      clusterConf: KafkaTokenClusterConf): 
(Token[KafkaDelegationTokenIdentifier], Long) = {
     checkProxyUser()
 
-    val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+    val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf, clusterConf))
     val createDelegationTokenOptions = new CreateDelegationTokenOptions()
     val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
     val token = createResult.delegationToken().get()
@@ -61,7 +71,7 @@ private[spark] object KafkaTokenUtil extends Logging {
       token.tokenInfo.tokenId.getBytes,
       token.hmacAsBase64String.getBytes,
       TOKEN_KIND,
-      TOKEN_SERVICE
+      getTokenService(clusterConf.identifier)
     ), token.tokenInfo.expiryTimestamp)
   }
 
@@ -73,23 +83,23 @@ private[spark] object KafkaTokenUtil extends Logging {
       "user is not yet supported.")
   }
 
-  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
+  private[kafka010] def createAdminClientProperties(
+      sparkConf: SparkConf,
+      clusterConf: KafkaTokenClusterConf): ju.Properties = {
     val adminClientProperties = new ju.Properties
 
-    val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS)
-    require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
-      "servers not configured.")
-    adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+    adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+      clusterConf.authBootstrapServers)
 
-    val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
-    adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
-    protocol match {
+    adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+      clusterConf.securityProtocol)
+    clusterConf.securityProtocol match {
       case SASL_SSL.name =>
-        setTrustStoreProperties(sparkConf, adminClientProperties)
+        setTrustStoreProperties(clusterConf, adminClientProperties)
 
       case SSL.name =>
-        setTrustStoreProperties(sparkConf, adminClientProperties)
-        setKeyStoreProperties(sparkConf, adminClientProperties)
+        setTrustStoreProperties(clusterConf, adminClientProperties)
+        setKeyStoreProperties(clusterConf, adminClientProperties)
         logWarning("Obtaining kafka delegation token with SSL protocol. Please 
" +
           "configure 2-way authentication on the broker side.")
 
@@ -114,11 +124,11 @@ private[spark] object KafkaTokenUtil extends Logging {
       adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)
       if (sparkConf.contains(KEYTAB)) {
         logDebug("Keytab detected, using it for login.")
-        val jaasParams = getKeytabJaasParams(sparkConf)
+        val jaasParams = getKeytabJaasParams(sparkConf, clusterConf)
         adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
       } else {
         logDebug("Using ticket cache for login.")
-        val jaasParams = getTicketCacheJaasParams(sparkConf)
+        val jaasParams = getTicketCacheJaasParams(clusterConf)
         adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
       }
     }
@@ -135,34 +145,40 @@ private[spark] object KafkaTokenUtil extends Logging {
     }
   }
 
-  private def setTrustStoreProperties(sparkConf: SparkConf, properties: 
ju.Properties): Unit = {
-    sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
+  private def setTrustStoreProperties(
+      clusterConf: KafkaTokenClusterConf,
+      properties: ju.Properties): Unit = {
+    clusterConf.trustStoreLocation.foreach { truststoreLocation =>
       properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
truststoreLocation)
     }
-    sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
+    clusterConf.trustStorePassword.foreach { truststorePassword =>
       properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
truststorePassword)
     }
   }
 
-  private def setKeyStoreProperties(sparkConf: SparkConf, properties: 
ju.Properties): Unit = {
-    sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
+  private def setKeyStoreProperties(
+      clusterConf: KafkaTokenClusterConf,
+      properties: ju.Properties): Unit = {
+    clusterConf.keyStoreLocation.foreach { keystoreLocation =>
       properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation)
     }
-    sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
+    clusterConf.keyStorePassword.foreach { keystorePassword =>
       properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword)
     }
-    sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
+    clusterConf.keyPassword.foreach { keyPassword =>
       properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword)
     }
   }
 
-  private def getKeytabJaasParams(sparkConf: SparkConf): String = {
+  private def getKeytabJaasParams(
+      sparkConf: SparkConf,
+      clusterConf: KafkaTokenClusterConf): String = {
     val params =
       s"""
       |${getKrb5LoginModuleName} required
       | debug=${isGlobalKrbDebugEnabled()}
       | useKeyTab=true
-      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
+      | serviceName="${clusterConf.kerberosServiceName}"
       | keyTab="${sparkConf.get(KEYTAB).get}"
       | principal="${sparkConf.get(PRINCIPAL).get}";
       """.stripMargin.replace("\n", "")
@@ -170,16 +186,13 @@ private[spark] object KafkaTokenUtil extends Logging {
     params
   }
 
-  private def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
-    val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
-    require(serviceName.nonEmpty, "Kerberos service name must be defined")
-
+  private def getTicketCacheJaasParams(clusterConf: KafkaTokenClusterConf): 
String = {
     val params =
       s"""
       |${getKrb5LoginModuleName} required
       | debug=${isGlobalKrbDebugEnabled()}
       | useTicketCache=true
-      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}";
+      | serviceName="${clusterConf.kerberosServiceName}";
       """.stripMargin.replace("\n", "")
     logDebug(s"Krb ticket cache JAAS params: $params")
     params
@@ -223,14 +236,27 @@ private[spark] object KafkaTokenUtil extends Logging {
     }
   }
 
-  def isTokenAvailable(): Boolean = {
-    UserGroupInformation.getCurrentUser().getCredentials.getToken(
-      KafkaTokenUtil.TOKEN_SERVICE) != null
+  def findMatchingToken(
+      sparkConf: SparkConf,
+      bootStrapServers: String): Option[KafkaTokenClusterConf] = {
+    val tokens = 
UserGroupInformation.getCurrentUser().getCredentials.getAllTokens.asScala
+    val clusterConfigs = tokens
+      .filter(_.getService().toString().startsWith(TOKEN_SERVICE_PREFIX))
+      .map { token =>
+        KafkaTokenSparkConf.getClusterConfig(sparkConf, 
getClusterIdentifier(token.getService()))
+      }
+      .filter { clusterConfig =>
+        val pattern = Pattern.compile(clusterConfig.targetServersRegex)
+        
Utils.stringToSeq(bootStrapServers).exists(pattern.matcher(_).matches())
+      }
+    require(clusterConfigs.size <= 1, "More than one delegation token matches 
the following " +
+      s"bootstrap servers: $bootStrapServers.")
+    clusterConfigs.headOption
   }
 
-  def getTokenJaasParams(sparkConf: SparkConf): String = {
+  def getTokenJaasParams(clusterConf: KafkaTokenClusterConf): String = {
     val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
-      KafkaTokenUtil.TOKEN_SERVICE)
+      getTokenService(clusterConf.identifier))
     val username = new String(token.getIdentifier)
     val password = new String(token.getPassword)
 
@@ -239,7 +265,7 @@ private[spark] object KafkaTokenUtil extends Logging {
       s"""
       |$loginModuleName required
       | tokenauth=true
-      | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
+      | serviceName="${clusterConf.kerberosServiceName}"
       | username="$username"
       | password="$password";
       """.stripMargin.replace("\n", "")
diff --git 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
index 538486b..8f4cedf 100644
--- 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
+++ 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
@@ -17,16 +17,19 @@
 
 package org.apache.spark.kafka010
 
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.SaslConfigs
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.internal.config._
 
 class KafkaConfigUpdaterSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
+  private val identifier = "cluster1"
+  private val tokenService = KafkaTokenUtil.getTokenService(identifier)
   private val testModule = "testModule"
   private val testKey = "testKey"
   private val testValue = "testValue"
   private val otherTestValue = "otherTestValue"
+  private val bootStrapServers = "127.0.0.1:0"
 
   test("set should always set value") {
     val params = Map.empty[String, String]
@@ -73,24 +76,38 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with 
KafkaDelegationTokenTes
   }
 
   test("setAuthenticationConfigIfNeeded with token should set values") {
-    val params = Map.empty[String, String]
-    setSparkEnv(Map.empty)
-    addTokenToUGI()
+    val params = Map(
+      CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers
+    )
+    setSparkEnv(
+      Map(
+        s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> 
bootStrapServers
+      )
+    )
+    addTokenToUGI(tokenService)
 
     val updatedParams = KafkaConfigUpdater(testModule, params)
       .setAuthenticationConfigIfNeeded()
       .build()
 
-    assert(updatedParams.size() === 2)
+    assert(updatedParams.size() === 3)
+    assert(updatedParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === 
bootStrapServers)
     assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
     assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) ===
-      Kafka.TOKEN_SASL_MECHANISM.defaultValueString)
+      KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
   }
 
-  test("setAuthenticationConfigIfNeeded with token and invalid mechanism 
should throw exception") {
-    val params = Map.empty[String, String]
-    setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> 
"INVALID"))
-    addTokenToUGI()
+  test("setAuthenticationConfigIfNeeded with invalid mechanism should throw 
exception") {
+    val params = Map(
+      CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers
+    )
+    setSparkEnv(
+      Map(
+        s"spark.kafka.clusters.$identifier.auth.bootstrap.servers" -> 
bootStrapServers,
+        s"spark.kafka.clusters.$identifier.sasl.token.mechanism" -> 
"intentionally_invalid"
+      )
+    )
+    addTokenToUGI(tokenService)
 
     val e = intercept[IllegalArgumentException] {
       KafkaConfigUpdater(testModule, params)
@@ -102,12 +119,16 @@ class KafkaConfigUpdaterSuite extends SparkFunSuite with 
KafkaDelegationTokenTes
   }
 
   test("setAuthenticationConfigIfNeeded without security should not set 
values") {
-    val params = Map.empty[String, String]
+    val params = Map(
+      CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> bootStrapServers
+    )
+    setSparkEnv(Map.empty)
 
     val updatedParams = KafkaConfigUpdater(testModule, params)
       .setAuthenticationConfigIfNeeded()
       .build()
 
-    assert(updatedParams.size() === 0)
+    assert(updatedParams.size() === 1)
+    assert(updatedParams.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) === 
bootStrapServers)
   }
 }
diff --git 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
index bd9b873..ac59c61 100644
--- 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
+++ 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.kafka010
 import java.{util => ju}
 import javax.security.auth.login.{AppConfigurationEntry, Configuration}
 
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.Token
 import org.mockito.Mockito.mock
@@ -70,15 +71,15 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
     Configuration.setConfiguration(new KafkaJaasConfiguration)
   }
 
-  protected def addTokenToUGI(): Unit = {
+  protected def addTokenToUGI(tokenService: Text): Unit = {
     val token = new Token[KafkaDelegationTokenIdentifier](
       tokenId.getBytes,
       tokenPassword.getBytes,
       KafkaTokenUtil.TOKEN_KIND,
-      KafkaTokenUtil.TOKEN_SERVICE
+      tokenService
     )
     val creds = new Credentials()
-    creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+    creds.addToken(token.getService, token)
     UserGroupInformation.getCurrentUser.addCredentials(creds)
   }
 
diff --git 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
new file mode 100644
index 0000000..60bb8a2
--- /dev/null
+++ 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenSparkConfSuite.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kafka010
+
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_SSL, SSL}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class KafkaTokenSparkConfSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val identifier1 = "cluster1"
+  private val identifier2 = "cluster2"
+  private val authBootStrapServers = "127.0.0.1:0"
+  private val targetServersRegex = "127.0.0.*:0"
+  private val securityProtocol = SSL.name
+  private val kerberosServiceName = "kafka1"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "trustStoreSecret"
+  private val keyStoreLocation = "/path/to/keyStore"
+  private val keyStorePassword = "keyStoreSecret"
+  private val keyPassword = "keySecret"
+  private val tokenMechanism = "SCRAM-SHA-256"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    sparkConf = new SparkConf()
+  }
+
+  test("getClusterConfig should trow exception when not exists") {
+    val thrown = intercept[NoSuchElementException] {
+      KafkaTokenSparkConf.getClusterConfig(sparkConf, "invalid")
+    }
+    assert(thrown.getMessage contains 
"spark.kafka.clusters.invalid.auth.bootstrap.servers")
+  }
+
+  test("getClusterConfig should return entry with defaults") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
authBootStrapServers)
+
+    val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, 
identifier1)
+    assert(clusterConfig.identifier === identifier1)
+    assert(clusterConfig.authBootstrapServers === authBootStrapServers)
+    assert(clusterConfig.targetServersRegex === 
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX)
+    assert(clusterConfig.securityProtocol === SASL_SSL.name)
+    assert(clusterConfig.kerberosServiceName ===
+      KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME)
+    assert(clusterConfig.trustStoreLocation === None)
+    assert(clusterConfig.trustStorePassword === None)
+    assert(clusterConfig.keyStoreLocation === None)
+    assert(clusterConfig.keyStorePassword === None)
+    assert(clusterConfig.keyPassword === None)
+    assert(clusterConfig.tokenMechanism === 
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+  }
+
+  test("getClusterConfig should return entry overwrite defaults") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
authBootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex",
+      targetServersRegex)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.security.protocol", 
securityProtocol)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.kerberos.service.name",
+      kerberosServiceName)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.location", 
trustStoreLocation)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.truststore.password", 
trustStorePassword)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.location", 
keyStoreLocation)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.keystore.password", 
keyStorePassword)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.ssl.key.password", 
keyPassword)
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.sasl.token.mechanism", 
tokenMechanism)
+
+    val clusterConfig = KafkaTokenSparkConf.getClusterConfig(sparkConf, 
identifier1)
+    assert(clusterConfig.identifier === identifier1)
+    assert(clusterConfig.authBootstrapServers === authBootStrapServers)
+    assert(clusterConfig.targetServersRegex === targetServersRegex)
+    assert(clusterConfig.securityProtocol === securityProtocol)
+    assert(clusterConfig.kerberosServiceName === kerberosServiceName)
+    assert(clusterConfig.trustStoreLocation === Some(trustStoreLocation))
+    assert(clusterConfig.trustStorePassword === Some(trustStorePassword))
+    assert(clusterConfig.keyStoreLocation === Some(keyStoreLocation))
+    assert(clusterConfig.keyStorePassword === Some(keyStorePassword))
+    assert(clusterConfig.keyPassword === Some(keyPassword))
+    assert(clusterConfig.tokenMechanism === tokenMechanism)
+  }
+
+  test("getAllClusterConfigs should return empty list when nothing 
configured") {
+    assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
+  }
+
+  test("getAllClusterConfigs should return empty list with malformed 
configuration") {
+    sparkConf.set(s"spark.kafka.clusters.", authBootStrapServers)
+    assert(KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).isEmpty)
+  }
+
+  test("getAllClusterConfigs should return multiple entries") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
authBootStrapServers)
+    sparkConf.set(s"spark.kafka.clusters.$identifier2.auth.bootstrap.servers", 
authBootStrapServers)
+
+    val clusterConfigs = KafkaTokenSparkConf.getAllClusterConfigs(sparkConf)
+    assert(clusterConfigs.size === 2)
+    clusterConfigs.foreach { clusterConfig =>
+      assert(clusterConfig.authBootstrapServers === authBootStrapServers)
+      assert(clusterConfig.targetServersRegex === 
KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX)
+      assert(clusterConfig.securityProtocol === SASL_SSL.name)
+      assert(clusterConfig.kerberosServiceName ===
+        KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME)
+      assert(clusterConfig.trustStoreLocation === None)
+      assert(clusterConfig.trustStorePassword === None)
+      assert(clusterConfig.keyStoreLocation === None)
+      assert(clusterConfig.keyStorePassword === None)
+      assert(clusterConfig.keyPassword === None)
+      assert(clusterConfig.tokenMechanism === 
KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+    }
+  }
+}
diff --git 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 763f8db..11f954b 100644
--- 
a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ 
b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.kafka010
 
 import java.security.PrivilegedExceptionAction
 
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
@@ -28,7 +29,13 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config._
 
 class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
+  private val identifier1 = "cluster1"
+  private val identifier2 = "cluster2"
+  private val tokenService1 = KafkaTokenUtil.getTokenService(identifier1)
+  private val tokenService2 = KafkaTokenUtil.getTokenService(identifier2)
   private val bootStrapServers = "127.0.0.1:0"
+  private val matchingTargetServersRegex = "127.0.0.*:0"
+  private val nonMatchingTargetServersRegex = "127.0.1.*:0"
   private val trustStoreLocation = "/path/to/trustStore"
   private val trustStorePassword = "trustStoreSecret"
   private val keyStoreLocation = "/path/to/keyStore"
@@ -59,25 +66,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
     )
   }
 
-  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
-    val thrown = intercept[IllegalArgumentException] {
-      KafkaTokenUtil.createAdminClientProperties(sparkConf)
-    }
-    assert(thrown.getMessage contains
-      "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
-  }
-
   test("createAdminClientProperties with SASL_PLAINTEXT protocol should not 
include " +
       "keystore and truststore config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_PLAINTEXT.name)
-    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
-    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStoreLocation)
-    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
-    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
-    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    val clusterConf = createClusterConf(SASL_PLAINTEXT.name)
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -91,15 +84,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
   }
 
   test("createAdminClientProperties with SASL_SSL protocol should include 
truststore config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
-    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
-    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
-    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
-    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
-    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    val clusterConf = createClusterConf(SASL_SSL.name)
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -116,15 +103,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
 
   test("createAdminClientProperties with SSL protocol should include keystore 
and truststore " +
       "config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SSL.name)
-    sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
-    sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
-    sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
-    sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
-    sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+    val clusterConf = createClusterConf(SSL.name)
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -140,11 +121,10 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
   }
 
   test("createAdminClientProperties with global config should not set dynamic 
jaas config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    val clusterConf = createClusterConf(SASL_SSL.name)
     setGlobalKafkaClientConfig()
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -155,12 +135,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
   }
 
   test("createAdminClientProperties with keytab should set keytab dynamic jaas 
config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
     sparkConf.set(KEYTAB, keytab)
     sparkConf.set(PRINCIPAL, principal)
+    val clusterConf = createClusterConf(SASL_SSL.name)
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -176,10 +155,9 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
   }
 
   test("createAdminClientProperties without keytab should set ticket cache 
dynamic jaas config") {
-    sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
-    sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+    val clusterConf = createClusterConf(SASL_SSL.name)
 
-    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+    val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf, clusterConf)
 
     
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
       === bootStrapServers)
@@ -202,24 +180,73 @@ class KafkaTokenUtilSuite extends SparkFunSuite with 
KafkaDelegationTokenTest {
     assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
   }
 
-  test("isTokenAvailable without token should return false") {
-    assert(!KafkaTokenUtil.isTokenAvailable())
+  test("findMatchingToken without token should return None") {
+    assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) === 
None)
   }
 
-  test("isTokenAvailable with token should return true") {
-    addTokenToUGI()
+  test("findMatchingToken with non-matching tokens should return None") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
bootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex",
+      nonMatchingTargetServersRegex)
+    sparkConf.set(s"spark.kafka.clusters.$identifier2.bootstrap.servers", 
bootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier2.target.bootstrap.servers.regex",
+      matchingTargetServersRegex)
+    addTokenToUGI(tokenService1)
+    addTokenToUGI(new Text("intentionally_garbage"))
+
+    assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) === 
None)
+  }
+
+  test("findMatchingToken with one matching token should return cluster 
configuration") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
bootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex",
+      matchingTargetServersRegex)
+    addTokenToUGI(tokenService1)
 
-    assert(KafkaTokenUtil.isTokenAvailable())
+    assert(KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers) ===
+      Some(KafkaTokenSparkConf.getClusterConfig(sparkConf, identifier1)))
+  }
+
+  test("findMatchingToken with multiple matching tokens should throw 
exception") {
+    sparkConf.set(s"spark.kafka.clusters.$identifier1.auth.bootstrap.servers", 
bootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier1.target.bootstrap.servers.regex",
+      matchingTargetServersRegex)
+    sparkConf.set(s"spark.kafka.clusters.$identifier2.auth.bootstrap.servers", 
bootStrapServers)
+    
sparkConf.set(s"spark.kafka.clusters.$identifier2.target.bootstrap.servers.regex",
+      matchingTargetServersRegex)
+    addTokenToUGI(tokenService1)
+    addTokenToUGI(tokenService2)
+
+    val thrown = intercept[IllegalArgumentException] {
+      KafkaTokenUtil.findMatchingToken(sparkConf, bootStrapServers)
+    }
+    assert(thrown.getMessage.contains("More than one delegation token 
matches"))
   }
 
   test("getTokenJaasParams with token should return scram module") {
-    addTokenToUGI()
+    addTokenToUGI(tokenService1)
+    val clusterConf = createClusterConf(SASL_SSL.name)
 
-    val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf())
+    val jaasParams = KafkaTokenUtil.getTokenJaasParams(clusterConf)
 
     assert(jaasParams.contains("ScramLoginModule required"))
     assert(jaasParams.contains("tokenauth=true"))
     assert(jaasParams.contains(tokenId))
     assert(jaasParams.contains(tokenPassword))
   }
+
+  private def createClusterConf(securityProtocol: String): 
KafkaTokenClusterConf = {
+    KafkaTokenClusterConf(
+      identifier1,
+      bootStrapServers,
+      KafkaTokenSparkConf.DEFAULT_TARGET_SERVERS_REGEX,
+      securityProtocol,
+      KafkaTokenSparkConf.DEFAULT_SASL_KERBEROS_SERVICE_NAME,
+      Some(trustStoreLocation),
+      Some(trustStorePassword),
+      Some(keyStoreLocation),
+      Some(keyStorePassword),
+      Some(keyPassword),
+      KafkaTokenSparkConf.DEFAULT_SASL_TOKEN_MECHANISM)
+  }
 }
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index f78bdac..397de87 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -96,6 +96,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
index d934c64..d8df549 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala
@@ -25,11 +25,13 @@ import scala.util.Random
 import org.apache.kafka.clients.consumer.ConsumerConfig._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.mockito.Mockito.when
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
 
-class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll {
+class KafkaDataConsumerSuite extends SparkFunSuite with MockitoSugar with 
BeforeAndAfterAll {
   private var testUtils: KafkaTestUtils = _
   private val topic = "topic" + Random.nextInt()
   private val topicPartition = new TopicPartition(topic, 0)
@@ -37,6 +39,11 @@ class KafkaDataConsumerSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    val conf = new SparkConf()
+    val env = mock[SparkEnv]
+    SparkEnv.set(env)
+    when(env.conf).thenReturn(conf)
+
     testUtils = new KafkaTestUtils
     testUtils.setup()
     KafkaDataConsumer.init(16, 64, 0.75f)
@@ -47,6 +54,7 @@ class KafkaDataConsumerSuite extends SparkFunSuite with 
BeforeAndAfterAll {
       testUtils.teardown()
       testUtils = null
     }
+    SparkEnv.set(null)
     super.afterAll()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to