Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22598#discussion_r236438713
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
    @@ -0,0 +1,168 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.security
    +
    +import java.text.SimpleDateFormat
    +import java.util.Properties
    +
    +import org.apache.hadoop.io.Text
    +import org.apache.hadoop.security.token.{Token, TokenIdentifier}
    +import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
    +import org.apache.kafka.clients.CommonClientConfigs
    +import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
    +import org.apache.kafka.common.config.SaslConfigs
    +import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
    +import org.apache.kafka.common.security.token.delegation.DelegationToken
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.internal.config._
    +
    +private[spark] object KafkaTokenUtil extends Logging {
    +  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
    +  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
    +
    +  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
    +    override def getKind: Text = TOKEN_KIND
    +  }
    +
    +  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
    +    val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
    +    val createDelegationTokenOptions = new CreateDelegationTokenOptions()
    +    val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
    +    val token = createResult.delegationToken().get()
    +    printToken(token)
    +
    +    (new Token[KafkaDelegationTokenIdentifier](
    +      token.tokenInfo.tokenId.getBytes,
    +      token.hmacAsBase64String.getBytes,
    +      TOKEN_KIND,
    +      TOKEN_SERVICE
    +    ), token.tokenInfo.expiryTimestamp)
    +  }
    +
    +  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
    +    val adminClientProperties = new Properties
    +
    +    val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
    +    require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
    +      "servers not configured.")
    +    
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
    +
    +    val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
    +    
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
    +    protocol match {
    +      case SASL_SSL.name =>
    +        setTrustStoreProperties(sparkConf, adminClientProperties)
    +
    +      case SSL.name =>
    +        setTrustStoreProperties(sparkConf, adminClientProperties)
    +        setKeyStoreProperties(sparkConf, adminClientProperties)
    +        logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
    +          "configure 2-way authentication on the broker side.")
    +
    +      case SASL_PLAINTEXT.name =>
    +        logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
    +          "consider the security impact.")
    +    }
    +
    +    // There are multiple possibilities to log in:
    --- End diff --
    
    So, currently there's no way to use the existing login. Did you file a bug 
for Kafka to implement that? That bug should be referenced here.
    
    Also, what about the existing TGT case? You mentioned you need a different 
JAAS config for that, but I don't see that handled here? Users shouldn't need 
to manually provide that. They don't for any other service supported by this 
framework.
    
    So this should be `if (keytab.isDefined) useKeytabConfig else useTgtConfig`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to