gaborgsomogyi commented on a change in pull request #24305: [SPARK-27294][SS] 
Add multi-cluster Kafka delegation token
URL: https://github.com/apache/spark/pull/24305#discussion_r275334403
 
 

 ##########
 File path: 
external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala
 ##########
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kafka010
+
+import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Kafka
+
+private[spark] case class KafkaTokenClusterConf(
+  identifier: String,
+  bootstrapServers: String,
+  securityProtocol: String,
+  kerberosServiceName: String,
+  trustStoreLocation: Option[String],
+  trustStorePassword: Option[String],
+  keyStoreLocation: Option[String],
+  keyStorePassword: Option[String],
+  keyPassword: Option[String],
+  tokenMechanism: String) {
+  override def toString: String = s"KafkaTokenClusterConf{"
+    s"identifier=$identifier, " +
+    s"bootstrapServers=$bootstrapServers, "
+    s"securityProtocol=$securityProtocol, " +
+    s"kerberosServiceName=$kerberosServiceName, " +
+    s"trustStoreLocation=$trustStoreLocation, " +
+    s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+    s"keyStoreLocation=$keyStoreLocation, "
+    s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+    s"keyPassword=${keyPassword.map(_ => "xxx")}, " +
+    s"tokenMechanism=$tokenMechanism}"
+}
+
+private[spark] class KafkaTokenSparkConf(sparkConf: SparkConf) extends Logging 
{
+  def getClusterIdentifiers(): Array[String] = {
+    val result = sparkConf.get(Kafka.CLUSTERS) match {
 
 Review comment:
   > Can the Kafka libraries figure out the right delegation token when talking 
to a remote server?
   
   That's a really good idea and was excited until I've figured out that it's 
not implemented in Kafka :/
   Login modules like `Krb5LoginModule` and `ScramLoginModule` support 
different authentication control flags like `required`, etc... When 
`sufficient` provided then the  LoginModule is not required to succeed and if 
it fails, authentication continues down the LoginModule list. My plan was to 
add `ScramLoginModule` as many times as the number of available delegation 
tokens with `sufficient` flag.
   
   I've tested it but its not implemented in Kafka 
([this](https://github.com/apache/kafka/blob/02221bd907a23041c95ce6446986bff631652b3a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java#L95)
 code blows up):
   ```
   Exception in thread "main" org.apache.kafka.common.KafkaException: Failed 
create new KafkaAdminClient
        at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370)
        at 
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52)
        at 
com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96)
        at 
com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala)
   Caused by: java.lang.IllegalArgumentException: JAAS config property contains 
2 login modules, should be 1 module
        at 
org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95)
        at 
org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119)
        at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65)
        at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88)
        at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346)
        ... 3 more
   ```
   I've filed [KAFKA-8234](https://issues.apache.org/jira/browse/KAFKA-8234) 
but I think its not an option for this case (even if its implemented not all 
Kafka brokers will have it).
   
   What I'm thinking of is to take over the `bootstrapServers` parameter from 
token side if cluster identifier given. The logic would be something like:
   * Use `clusterId` parameter name instead of `tokenClusterId`
   * If `bootstrapServers` provided only on token side and `clusterId` defined 
on processing side then take the servers over (only one place of configuration).
   * If `bootstrapServers` provided both on token + data processing side then 
give a warning (maybe user don't know it can be taken over or just accidentally 
misconfigured). This can be even an exception which doesn't let the query 
start. I'm always afraid of introducing something like this but personally 
think the second is the right way.
   * If `bootstrapServers` provided only on data processing side then work just 
like before
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to