vanzin commented on a change in pull request #24305: [SPARK-27294][SS] Add 
multi-cluster Kafka delegation token
URL: https://github.com/apache/spark/pull/24305#discussion_r279854603
 
 

 ##########
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
 ##########
 @@ -38,24 +37,49 @@ private[spark] class KafkaDelegationTokenProvider
       sparkConf: SparkConf,
       creds: Credentials): Option[Long] = {
     try {
-      logDebug("Attempting to fetch Kafka security token.")
-      val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
-      creds.addToken(token.getService, token)
-      return Some(nextRenewalDate)
+      var lowestNextRenewalDate: Option[Long] = None
+      KafkaTokenSparkConf.getAllClusterConfigs(sparkConf).foreach { 
clusterConf =>
+        if (delegationTokensRequired(clusterConf)) {
+          try {
+            logDebug(
+              s"Attempting to fetch Kafka security token for cluster 
${clusterConf.identifier}.")
+            val (token, nextRenewalDate) = 
KafkaTokenUtil.obtainToken(sparkConf, clusterConf)
+            creds.addToken(token.getService, token)
+            if (lowestNextRenewalDate.isEmpty || nextRenewalDate < 
lowestNextRenewalDate.get) {
+              lowestNextRenewalDate = Some(nextRenewalDate)
+            }
+          } catch {
+            case NonFatal(e) =>
+              logWarning(s"Failed to get token from service: $serviceName " +
+                s"cluster: ${clusterConf.identifier}", e)
+          }
+        } else {
+          logDebug(
+            s"Cluster ${clusterConf.identifier} does not require delegation 
token, skipping.")
+        }
+      }
+      lowestNextRenewalDate
     } catch {
       case NonFatal(e) =>
-        logWarning(s"Failed to get token from service $serviceName", e)
+        logWarning(s"Failed to get token cluster configuration", e)
+        None
 
 Review comment:
   This isn't exactly correct, since you may get a few delegation tokens. I 
guess the question here is what should be the error behavior? Seems like the 
previous behavior is not just log the error and let the application fail later 
because of auth errors.
   
   So the correct thing here seems to be to move the error handling above, 
where the code tries to get a token for a specific cluster, and keep going if 
that particular one fails.
   
   That would also avoid the two levels of error catching you currently have.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to