This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e0b6b  [SPARK-27002][SS] Get kafka delegation tokens right before 
consumer/producer created
76e0b6b is described below

commit 76e0b6bafb66a5cd02edcf924a90fc389fddea8e
Author: Gabor Somogyi <gabor.g.somo...@gmail.com>
AuthorDate: Wed Feb 27 10:07:02 2019 -0800

    [SPARK-27002][SS] Get kafka delegation tokens right before 
consumer/producer created
    
    ## What changes were proposed in this pull request?
    
    Spark not always picking up the latest Kafka delegation tokens even if a 
new one properly obtained.
    In the PR I'm setting delegation tokens right before `KafkaConsumer` and 
`KafkaProducer` creation to be on the safe side.
    
    ## How was this patch tested?
    
    Long running Kafka to Kafka tests on 4 node cluster with randomly thrown 
artificial exceptions.
    
    Test scenario:
    * 4 node cluster
    * Yarn
    * Kafka broker version 2.1.0
    * security.protocol = SASL_SSL
    * sasl.mechanism = SCRAM-SHA-512
    
    Kafka broker settings:
    * delegation.token.expiry.time.ms=600000 (10 min)
    * delegation.token.max.lifetime.ms=1200000 (20 min)
    * delegation.token.expiry.check.interval.ms=300000 (5 min)
    
    After each 7.5 minutes new delegation token obtained from Kafka broker (10 
min * 0.75).
    But when token expired after 10 minutes (Spark obtains new one and doesn't 
renew the old), the brokers expiring thread comes after each 5 minutes 
(invalidates expired tokens) and artificial exception has been thrown inside 
the Spark application (such case Spark closes connection), then the latest 
delegation token not always picked up.
    
    Closes #23906 from gaborgsomogyi/SPARK-27002.
    
    Authored-by: Gabor Somogyi <gabor.g.somo...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../spark/sql/kafka010/CachedKafkaProducer.scala       |  8 ++++++--
 .../apache/spark/sql/kafka010/ConsumerStrategy.scala   | 18 +++++++++++++++---
 .../apache/spark/sql/kafka010/KafkaConfigUpdater.scala |  2 +-
 .../apache/spark/sql/kafka010/KafkaDataConsumer.scala  |  5 ++++-
 .../spark/sql/kafka010/KafkaSourceProvider.scala       |  3 ---
 5 files changed, 26 insertions(+), 10 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index cd680ad..f24001f 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -64,8 +64,12 @@ private[kafka010] object CachedKafkaProducer extends Logging 
{
       .build[Seq[(String, Object)], Producer](cacheLoader)
 
   private def createKafkaProducer(producerConfiguration: ju.Map[String, 
Object]): Producer = {
-    val kafkaProducer: Producer = new Producer(producerConfiguration)
-    logDebug(s"Created a new instance of KafkaProducer for 
$producerConfiguration.")
+    val updatedKafkaProducerConfiguration =
+      KafkaConfigUpdater("executor", producerConfiguration.asScala.toMap)
+        .setAuthenticationConfigIfNeeded()
+        .build()
+    val kafkaProducer: Producer = new 
Producer(updatedKafkaProducerConfiguration)
+    logDebug(s"Created a new instance of KafkaProducer for 
$updatedKafkaProducerConfiguration.")
     kafkaProducer
   }
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index 66511b3..dfdafce 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -37,6 +37,15 @@ import org.apache.kafka.common.TopicPartition
 sealed trait ConsumerStrategy {
   /** Create a [[KafkaConsumer]] and subscribe to topics according to a 
desired strategy */
   def createConsumer(kafkaParams: ju.Map[String, Object]): 
Consumer[Array[Byte], Array[Byte]]
+
+  /**
+   * Updates the parameters with security if needed.
+   * Added a function to hide internals and reduce code duplications because 
all strategy uses it.
+   */
+  protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, 
Object]) =
+    KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+      .setAuthenticationConfigIfNeeded()
+      .build()
 }
 
 /**
@@ -45,7 +54,8 @@ sealed trait ConsumerStrategy {
 case class AssignStrategy(partitions: Array[TopicPartition]) extends 
ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
-    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[Array[Byte], 
Array[Byte]](updatedKafkaParams)
     consumer.assign(ju.Arrays.asList(partitions: _*))
     consumer
   }
@@ -59,7 +69,8 @@ case class AssignStrategy(partitions: Array[TopicPartition]) 
extends ConsumerStr
 case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
-    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[Array[Byte], 
Array[Byte]](updatedKafkaParams)
     consumer.subscribe(topics.asJava)
     consumer
   }
@@ -73,7 +84,8 @@ case class SubscribeStrategy(topics: Seq[String]) extends 
ConsumerStrategy {
 case class SubscribePatternStrategy(topicPattern: String) extends 
ConsumerStrategy {
   override def createConsumer(
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
-    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+    val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+    val consumer = new KafkaConsumer[Array[Byte], 
Array[Byte]](updatedKafkaParams)
     consumer.subscribe(
       ju.regex.Pattern.compile(topicPattern),
       new NoOpConsumerRebalanceListener())
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
index 38bf5d7..978dfe6 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
@@ -31,7 +31,7 @@ import org.apache.spark.kafka010.KafkaTokenUtil
 /**
  * Class to conveniently update Kafka config params, while logging the changes
  */
-private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: 
Map[String, String])
+private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: 
Map[String, Object])
     extends Logging {
   private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
 
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 7b1314b..a0255a1 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -197,7 +197,10 @@ private[kafka010] case class InternalKafkaConsumer(
 
   /** Create a KafkaConsumer to fetch records for `topicPartition` */
   private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
-    val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+    val updatedKafkaParams = KafkaConfigUpdater("executor", 
kafkaParams.asScala.toMap)
+      .setAuthenticationConfigIfNeeded()
+      .build()
+    val c = new KafkaConsumer[Array[Byte], Array[Byte]](updatedKafkaParams)
     val tps = new ju.ArrayList[TopicPartition]()
     tps.add(topicPartition)
     c.assign(tps)
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 6994517..a139573 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -525,7 +525,6 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .setAuthenticationConfigIfNeeded()
       .build()
 
   def kafkaParamsForExecutors(
@@ -547,7 +546,6 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
       // If buffer config is not set, set it to reasonable value to work around
       // buffer issues (see KAFKA-3135)
       .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-      .setAuthenticationConfigIfNeeded()
       .build()
 
   /**
@@ -582,7 +580,6 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
     KafkaConfigUpdater("executor", specifiedKafkaParams)
       .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
       .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
-      .setAuthenticationConfigIfNeeded()
       .build()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to