gaborgsomogyi commented on a change in pull request #24305: [SPARK-27294][SS] Add multi-cluster Kafka delegation token URL: https://github.com/apache/spark/pull/24305#discussion_r275334403
########## File path: external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenSparkConf.scala ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kafka010 + +import org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kafka + +private[spark] case class KafkaTokenClusterConf( + identifier: String, + bootstrapServers: String, + securityProtocol: String, + kerberosServiceName: String, + trustStoreLocation: Option[String], + trustStorePassword: Option[String], + keyStoreLocation: Option[String], + keyStorePassword: Option[String], + keyPassword: Option[String], + tokenMechanism: String) { + override def toString: String = s"KafkaTokenClusterConf{" + s"identifier=$identifier, " + + s"bootstrapServers=$bootstrapServers, " + s"securityProtocol=$securityProtocol, " + + s"kerberosServiceName=$kerberosServiceName, " + + s"trustStoreLocation=$trustStoreLocation, " + + s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"keyStoreLocation=$keyStoreLocation, " + s"keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + + s"keyPassword=${keyPassword.map(_ => "xxx")}, " + + s"tokenMechanism=$tokenMechanism}" +} + +private[spark] class KafkaTokenSparkConf(sparkConf: SparkConf) extends Logging { + def getClusterIdentifiers(): Array[String] = { + val result = sparkConf.get(Kafka.CLUSTERS) match { Review comment: > Can the Kafka libraries figure out the right delegation token when talking to a remote server? That's a really good idea and was excited until I've figured out that it's not implemented in Kafka :/ Login modules like `Krb5LoginModule` and `ScramLoginModule` support different authentication control flags like `required`, etc... When `sufficient` provided then the LoginModule is not required to succeed and if it fails, authentication continues down the LoginModule list. My plan was to add `ScramLoginModule` as many times as the number of available delegation tokens with `sufficient` flag. I've tested it but its not implemented in Kafka ([this](https://github.com/apache/kafka/blob/02221bd907a23041c95ce6446986bff631652b3a/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java#L95) code blows up): ``` Exception in thread "main" org.apache.kafka.common.KafkaException: Failed create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:370) at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:52) at com.kafka.delegationtoken.consumer.SecureKafkaConsumer$.main(SecureKafkaConsumer.scala:96) at com.kafka.delegationtoken.consumer.SecureKafkaConsumer.main(SecureKafkaConsumer.scala) Caused by: java.lang.IllegalArgumentException: JAAS config property contains 2 login modules, should be 1 module at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:95) at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:119) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:65) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:88) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:346) ... 3 more ``` I've filed [KAFKA-8234](https://issues.apache.org/jira/browse/KAFKA-8234) but I think its not an option for this case (even if its implemented not all Kafka brokers will have it). What I'm thinking of is to take over the `bootstrapServers` parameter from token side if cluster identifier given. The logic would be something like: * Use `clusterId` parameter name instead of `tokenClusterId` * If `bootstrapServers` provided only on token side and `clusterId` defined on processing side then take the servers over (only one place of configuration). * If `bootstrapServers` provided both on token + data processing side then give a warning (maybe user don't know it can be taken over or just accidentally misconfigured). This can be even an exception which doesn't let the query start. I'm always afraid of introducing something like this but personally think the second is the right way. * If `bootstrapServers` provided only on data processing side then work just like before ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
