[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22598 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237662539 --- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Kafka { + + private[spark] val BOOTSTRAP_SERVERS = --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237659299 --- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Kafka { + + private[spark] val BOOTSTRAP_SERVERS = --- End diff -- modifiers are now redundant since this is an object (and not a package object). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656411 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + override def afterEach(): Unit = { +try { + resetUGI +} finally { + super.afterEach() +} + } + + private def addTokenToUGI(): Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656366 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.{ util => ju } +import java.text.SimpleDateFormat + +import scala.util.control.NonFatal + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.JaasContext +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { +val adminClientProperties = new ju.Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in and applied in the following order: --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237656297 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -688,4 +688,65 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = --- End diff -- Moved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237620523 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.{ util => ju } +import java.text.SimpleDateFormat + +import scala.util.control.NonFatal + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.JaasContext +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { +val adminClientProperties = new ju.Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in and applied in the following order: --- End diff -- Mention the Kafka bug here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237620333 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -688,4 +688,65 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = --- End diff -- This pattern was added after you PR, but since it's there... might be better to put these into a new file (e.g. `Kafka.scala`) in this package, instead of adding more stuff to this file. (e.g. see `History.scala`) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237622443 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + override def afterEach(): Unit = { +try { + resetUGI +} finally { + super.afterEach() +} + } + + private def addTokenToUGI(): Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { --- End diff -- add `()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237495490 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Now the following order applies: * Global JVM configuration which is kafka specific (kafka looks for KafkaClient entry) This can be configured many ways not just `java.security.auth.login.config` but the mentioned `JaasContext.loadClientContext` handles them. * Keytab * Ticket cache I've described this in the code as well to make the intention clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237492537 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Added + tested the fallback mechanism. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237234310 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Kafka has it's public stuff to check for client config: `JaasContext.loadClientContext`. It throws exception when no config provided. This part works, testing the others... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237206264 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- + non-normative list of [what I think are the relevant JVM Settings](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java#L88) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r237203744 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- "java.security.auth.login.config" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236738629 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- If you want to keep the ability to provide a custom JAAS config, then just check for that. Isn't there a system property you need to set? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613589 --- Diff: pom.xml --- @@ -128,6 +128,7 @@ 1.2.1.spark2 1.2.1 +2.1.0 --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613501 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { +UserGroupInformation.setLoginUser(null) + } + + test("getTokenJaasParams without token should return None") { +val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf) +assert(!jaasParams.isDefined) + } + + test("getTokenJaasParams with token no service should throw exception") { +try { + addTokenToUGI + + val thrown = intercept[IllegalArgumentException] { +KafkaSecurityHelper.getTokenJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") +} finally { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613436 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236613351 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- The jira is: https://issues.apache.org/jira/browse/KAFKA-7677 The guys will take a look at it... Related `useTgtConfig` I've considered this and would make the user experience way much better. On the other hand this would close the possibility to use custom JAAS configuration (don't know how many people use this). I'm fine with that but one has to count with this consequence. Should we do this then? --- - To unsubscribe, e-mail:
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236441225 --- Diff: pom.xml --- @@ -128,6 +128,7 @@ 1.2.1.spark2 1.2.1 +2.1.0 --- End diff -- Since you're adding this here, should probably remove the declaration from the kafka modules' pom files. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236438713 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +protocol match { + case SASL_SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => +setTrustStoreProperties(sparkConf, adminClientProperties) +setKeyStoreProperties(sparkConf, adminClientProperties) +logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => +logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- So, currently there's no way to use the existing login. Did you file a bug for Kafka to implement that? That bug should be referenced here. Also, what about the existing TGT case? You mentioned you need a different JAAS config for that, but I don't see that handled here? Users shouldn't need to manually provide that. They don't for any other service supported by this framework. So this should be `if (keytab.isDefined) useKeytabConfig else useTgtConfig`. ---
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236440232 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { +val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE +) +val creds = new Credentials() +creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) +UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { +UserGroupInformation.setLoginUser(null) + } + + test("getTokenJaasParams without token should return None") { +val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf) +assert(!jaasParams.isDefined) + } + + test("getTokenJaasParams with token no service should throw exception") { +try { + addTokenToUGI + + val thrown = intercept[IllegalArgumentException] { +KafkaSecurityHelper.getTokenJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") +} finally { --- End diff -- Better to use an `after` block instead of having `try...finally` in every test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r236439851 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME} + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + private def addTokenToUGI: Unit = { --- End diff -- We use `()` for methods that do more than just returning some value with minimal processing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r235779789 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- Yeah, SSL 2-way authentication was not covered though it's not that hard to add. Using the enum is definitely worth but not found until you've linked here. I've adapted `KafkaDelegationTokenProvider` and `KafkaTokenUtil` to use them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user soenkeliebau commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r235395152 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- Looking at the possible protocol [values](https://github.com/apache/kafka/blob/068ab9cefae301f3187ea885d645c425955e77d2/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L26) this condition misses the value "SSL" unless I am mistaken. If the user presents a client certificate during the SSL handshake for a SSL connection he is allowed to obtain delegation tokens as well. If a client cert was not provided then Kafka will reject the token request, but that can should be covered in error handling I guess. Would it maybe be an option to move this check into the KafkaTokenUtil class and compare with the actual values of the SecurityProtocol enum instead of relying on hard coded Strings here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234543403 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- I've spoken with them and confirmed the same thing which is stated in the official API documentation: https://kafka.apache.org/documentation/#security_sasl_kerberos There is no such possibility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234043117 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- OK, speaking with them... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234040048 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- Where does that exception come from? The message by itself doesn't help. And that still does not answer my question. The fact it's even talking about JAAS configs tells me that it's still trying to log in. My question is whether there's an API in Kafka that means "just connect to the server with my existing credentials". At this point in the code, you are already logged in. You already have credentials. Kafka should not be forcing you to go through all this again. It may be that the answer is "Kafka doesn't have this API" in which case a bug in Kafka should be filed, and we may have to live with this until that
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r234037129 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- I've just double checked my knowledge and this case kafka looks for JAAS entry but will not find it so it throws this exception: ``` Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233934316 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233931731 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233898001 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- This code doesn't try to log in but provides dynamic jaas configuration to kafka. In the original implementation only jaas was possible now I've extended it to make things easier for the user. > why is that necessary? This eliminates the need to provide file based jaas configuration. Old way: https://github.com/gaborgsomogyi/spark-structured-secure-kafka-app New way: https://github.com/gaborgsomogyi/spark-structured-secure-dt-kafka-app > won't this fail if the user only has a TGT (i.e. login via kinit)? It works. TGT can be provided to kafka by providing a file based jaas file with
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233888412 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233871529 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- you are right: the user shouldn't need to be logging in again, as normal end-user use is to be kinited in, rather than giving a keytab --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233870838 --- Diff: core/pom.xml --- @@ -408,6 +408,19 @@ provided +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233870430 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( --- End diff -- Although in theory we could fix up MR, distp, spark etc to say "always ask for DTs", it may just encourage people to run with Kerberos off, which is never something they should be doing. I don't want to do that & am not actively playing with this approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858832 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858781 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858643 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858735 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858687 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233858574 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.language.existentials +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + logDebug("Attempting to fetch Kafka security token.") + val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf) + creds.addToken(token.getService, token) + return Some(nextRenewalDate) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233857377 --- Diff: core/pom.xml --- @@ -408,6 +408,19 @@ provided +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233630560 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) +assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + } + + test("createAdminClientProperties without keytab should not set dynamic jaas config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === sslSecurityProtocol) +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233629662 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { --- End diff -- s/take over/include --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233628422 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") --- End diff -- debug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233629692 --- Diff: core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === plainSecurityProtocol) +assert(!adminClientProperties.containsKey("ssl.truststore.location")) +assert(!adminClientProperties.containsKey("ssl.truststore.password")) + } + + test("createAdminClientProperties with SSL protocol should take over truststore config") { --- End diff -- s/take over/include --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233627918 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") --- End diff -- `private[spark]` is redundant when the class is already tagged that way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233627534 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.language.existentials +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + logDebug("Attempting to fetch Kafka security token.") + val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf) + creds.addToken(token.getService, token) + return Some(nextRenewalDate) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && --- End diff -- `sparkConf.contains(KAFKA_BOOTSTRAP_SERVERS)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233637694 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) +val createDelegationTokenOptions = new CreateDelegationTokenOptions() +val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) +val token = createResult.delegationToken().get() +printToken(token) + +(new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE +), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: --- End diff -- If I understand this correctly, this code is trying to log in again, right? Two questions: - why is that necessary? - won't this fail if the user only has a TGT (i.e. login via kinit)? I was expecting that you could reuse the already available login information to talk to the Kafka broker here and get delegation tokens. For example, you can do this to change the JVM's security context to the current user, if it's not already set: ``` org.apache.hadoop.security.UserGroupInformation.getCurrentUser().doAs( new java.security.PrivilegedExceptionAction[Unit] { override def run(): Unit = {
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233626891 --- Diff: core/pom.xml --- @@ -408,6 +408,19 @@ provided +
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r233628054 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[spark] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; --- End diff -- no semi-colons --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r232033107 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- Using full-reflection would make the code hell complicated but your other suggestion about provided scope is really good. This way the copy-paste can be avoided. Changed accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231591033 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- > add another package from where the TokenUtil can be loaded. Seems like what you're saying is that to support dstreams you'd have two pretty much identical TokenUtil classes in different modules? That sounds sub-optimal. At that point, it might be better to move the logic in that class to core, and just go full-reflection, or even add kafka as a `provided` dependency and mimic the Hive provider. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231458881 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- In general if we're not moving this class into kafka area the only change which has to be done is to add another package from where the `TokenUtil` can be loaded. Otherwise `KafkaDelegationTokenProvider` has to be copied from `spark-sql-kafka...` to `spark-streaming-kafka...`. I think leaving as it is makes it more simple. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231252803 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- > Just moves the reflection into HadoopDelegationTokenManager You can't escape reflection unless you use `java.util.ServiceLoader` here, and it was decided not to use that when this code was first written. My argument is that it is a lot less reflection - it's just `Class.forName` instead of this whole class here. > Has to be copied when delegation token will be introduced in DStreams And how is that different from what you have here? The code you have won't work for dstreams since it's not even available when you pull in just the dstream connector. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231126695 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- That will make `KafkaDelegationTokenProvider` more clean but has a couple of drawbacks: * Just move the reflection into `HadoopDelegationTokenManager` * Has to be copied when delegation token will be introduced in DStreams --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231042582 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( --- End diff -- Are there news on this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r231041173 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- Checking it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230683713 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- We've considered turning off by default and came to the conclusion what Marcelo described which I think still stands. The PR says documentation is not covered because design can change. My plan is to add it in a separate PR when the feature merged. Related what to document I think kafka integration guide should cover all the things. There it's already covered that the jar should be on the path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230680318 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- This condition means `if (bootstrap servers configured and (protocol == SASL_PLAINTEXT or protocol == SASL_SSL) )` Why do you think ssl not covered? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r230679147 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None --- End diff -- That's a nice catch, fixing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user soenkeliebau commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r229237022 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) --- End diff -- You are right, missed that. Sorry --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228681974 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- yes, I think the best we can do is to document the configs and throw some useful error messages to make the user aware of the "bootstrapservers" config (in case they accidently left it) when the spark-sql-kafka libraries are not in the classpath. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228674142 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- > if this config is set the spark-sql-kafka libraries needs to be in the class path as well That's actually an argument against disabling it by default. If you set that config and don't have the libs in the classpath, you should either: - get nothing (e.g. current HBase behavior) - get an error because the libraries are not present But disabling it by default just means you'd have 3 different things to do to enable this, instead of two. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228671263 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- Why I thought disabling by default might make sense - The tokens fetch would be attempted if just "spark.kafka.bootstrap.servers" is defined. And if this config is set the spark-sql-kafka libraries needs to be in the class path as well. Better mention these in the docs. We could also consider prefixing all the configs with spark.security.credentials.kafka instead of spark.kafka (like spark.security.credentials.kafka.bootstrap.servers) to make it explicit that these are security related settings required for fetching kafka delegation tokens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228667186 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider --- End diff -- Another alternative that may be cleaner than this is to actually have this class in the spark-sql-kafka module, and instantiate it in `HadoopDelegationTokenManager` using `Utils.classForName(...).newInstance()`. Then this particular code does not need to use reflection at all, and you may be able to clean some code up on the `TokenUtils` side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228666433 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None --- End diff -- If Kafka tokens expose that information it should be returned here. (HBase tokens, at least, don't...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228665965 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- It shouldn't be disabled by default. I think it's better to just remove the list of provides from the scaladoc. That's not really helpful in any case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228583252 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) --- End diff -- spark-core does not have a dependency on spark-sql-kafka so this is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user soenkeliebau commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228576259 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) --- End diff -- Is there a specific reason to dynamically load this class? This code looks quite similar to [HBaseDelegationTokenProvider](https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala#L41), but I think there it was done to avoid creating a hard dependency on the HBase libs of which TokenUtil is a part. In this case the (Kafka)TokenUtil is within the Spark project, so we could just use it directly, could we not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228339774 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None --- End diff -- Shouldn't this return the time of the next renewal? Otherwise how does the token manager know when should it be renewed or recreated ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228321793 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala --- @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) --- End diff -- Update the class docs of which providers are loaded by default or better set the default for `spark.security.credentials.kafka.enabled` to false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228320944 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- Given the defaults the tokens fetch would be attempted if only `spark.kafka.bootstrap.servers` is defined right ? And the spark-sql-kafka libraries needs to be in the class path as well ? Better mention these in the docs. And make the `spark.security.credentials.kafka.enabled` default to false if it makes sense. Also consider prefixing all the configs with `spark.security.credentials.kafka` instead of `spark.kafka` (like `spark.security.credentials.kafka.bootstrap.servers`) to make it explicit that these are security related settings required for fetching kafka delegation tokens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user soenkeliebau commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r228131806 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { +sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined && + sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL") --- End diff -- Was this a conscious decision to not use delegation tokens when using ssl authentication? That might also be useful to avoid spreading keys all over the cluster, even if load on the kdc is not an issue in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226659261 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226568611 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) --- End diff -- The possibility is there but doesn't throw exception. Input: `dateFormat.format(-1)` Output: `1970-01-01T00:59` It will end up in invalid token which can be found out from log printouts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226235800 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME, KEYTAB, PRINCIPAL} +import org.apache.spark.sql.kafka010.TokenUtil.KafkaDelegationTokenIdentifier + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() --- End diff -- If you are playing with UGI in tests, its usually safest to call`UserGroupInformation.reset()` during setup and teardown; this empties out all existing creds and avoids problems with >1 test in the same JVM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226235157 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) --- End diff -- are these always going to be valid? I.e. > 0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r226234330 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { +try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + val obtainToken = mirror.classLoader. +loadClass("org.apache.spark.sql.kafka010.TokenUtil"). +getMethod("obtainToken", classOf[SparkConf]) + + logDebug("Attempting to fetch Kafka security token.") + val token = obtainToken.invoke(null, sparkConf) +.asInstanceOf[Token[_ <: TokenIdentifier]] + creds.addToken(token.getService, token) +} catch { + case NonFatal(e) => +logInfo(s"Failed to get token from service $serviceName", e) +} + +None + } + + override def delegationTokensRequired( --- End diff -- OK: so this asks for DTs even if UGI says the cluster is insecure? Nothing wrong with that...I've been wondering what would happen if `HadoopFSDelegationTokenProvider` did the same thing: asked filesystems for their tokens even if in an insecure cluster, as it would let DT support in object stores (HADOOP-14556...) work without kerberos. I'd test to make sure that everything gets through OK. AFAIK YARN is happy to pass round credentials in an insecure cluster (it get the AM/RM token to the AM this way); its more a matter of making sure the launcher chain is all ready fo it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225752604 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225111947 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225111758 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r225109193 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- It exists and described in the SPIP: `spark.security.credentials.kafka.enabled` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224320793 --- Diff: core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KAFKA_DELEGATION_TOKEN_ENABLED, KAFKA_SECURITY_PROTOCOL} +import org.apache.spark.util.Utils + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( --- End diff -- Having an utility trait or utility singleton object could reduce the overkill, but personally I'd be OK on allowing 5~10 lines of duplication. If we are likely to leverage Scala reflection other than catalyst continuously (HBaseDelegationTokenProvider does it for two times), we could have utility class for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224323537 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- And I would rather say it should be a flag to enable/disable on delegation token. Not all end users who use secured Kafka cluster want to leverage delegation token. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224334764 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224322849 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -647,4 +647,42 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED = +ConfigBuilder("spark.kafka.delegation.token.enabled") + .doc("Set to 'true' for obtaining delegation token from kafka.") + .booleanConf + .createWithDefault(false) + + private[spark] val KAFKA_BOOTSTRAP_SERVERS = +ConfigBuilder("spark.kafka.bootstrap.servers") --- End diff -- While it is not possible to provide relevant configuration to source/sink, pre-defining Kafka related configurations one-by-one in here feels me as being too coupled with Kafka. It might also give confusion on where to put configuration on Kafka source/sink: this configuration must be only used for delegation token, but I can't indicate it from both configuration name as well as its doc. My 2 cents is just reserving prefix `spark.kafka.token` or similar, and leave a comment and don't define anything here. Would like to hear how committers think about how to add external configurations on Spark conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r224338353 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + private[kafka010] val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[kafka010] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( +"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( +tokenInfo.tokenId, +tokenInfo.owner, +tokenInfo.renewersAsString, +dateFormat.format(tokenInfo.issueTimestamp), +dateFormat.format(tokenInfo.expiryTimestamp), +dateFormat.format(tokenInfo.maxTimestamp))) +} + } + + private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation => +adminClientProperties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword => +adminClientProperties.put("ssl.truststore.password", truststorePassword) + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider the security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { jaasParams => + logInfo("Keytab detected, using it for login.") + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223987644 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + TokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = TokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + .equals(bootStrapServers)) --- End diff -- No particular reason, changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223987456 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.kafka.common.security.scram.ScramLoginModule + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object KafkaSecurityHelper extends Logging { + def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = { +val keytab = sparkConf.get(KEYTAB) +if (keytab.isDefined) { + val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME) + require(serviceName.nonEmpty, "Kerberos service name must be defined") + val principal = sparkConf.get(PRINCIPAL) + require(principal.nonEmpty, "Principal must be defined") + + val params = +s""" +|${getKrb5LoginModuleName} required +| useKeyTab=true +| serviceName="${serviceName.get}" +| keyTab="${keytab.get}" +| principal="${principal.get}"; +""".stripMargin.replace("\n", "") + logDebug(s"Krb JAAS params: $params") + Some(params) +} else { + None +} + } + + private def getKrb5LoginModuleName(): String = { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223625059 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val plainSecurityProtocol = "SASL_PLAINTEXT" + private val sslSecurityProtocol = "SASL_SSL" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "secret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "u...@domain.com" + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { +super.beforeEach() +sparkConf = new SparkConf() + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { +val thrown = intercept[IllegalArgumentException] { + TokenUtil.createAdminClientProperties(sparkConf) +} +assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties without SSL protocol should not take over truststore config") { +sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers) +sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol) +sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation) +sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation) + +val adminClientProperties = TokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + .equals(bootStrapServers)) --- End diff -- why use of .equals() over scalatest's `===` operator? That one includes the values in the assertion raised --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223623927 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.kafka.common.security.scram.ScramLoginModule + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object KafkaSecurityHelper extends Logging { + def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = { +val keytab = sparkConf.get(KEYTAB) +if (keytab.isDefined) { + val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME) + require(serviceName.nonEmpty, "Kerberos service name must be defined") + val principal = sparkConf.get(PRINCIPAL) + require(principal.nonEmpty, "Principal must be defined") + + val params = +s""" +|${getKrb5LoginModuleName} required +| useKeyTab=true +| serviceName="${serviceName.get}" +| keyTab="${keytab.get}" +| principal="${principal.get}"; +""".stripMargin.replace("\n", "") + logDebug(s"Krb JAAS params: $params") + Some(params) +} else { + None +} + } + + private def getKrb5LoginModuleName(): String = { --- End diff -- + add a comment pointing at hadoop UserGroupInformation so that if someone ever needs to maintain it, they'll know where to look --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354399 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { +adminClientProperties.put("ssl.truststore.location", truststoreLocation.get) + } else { +logInfo("No truststore location set for SSL.") + } + + val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD) + if (truststorePassword.nonEmpty) { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354478 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { +adminClientProperties.put("ssl.truststore.location", truststoreLocation.get) + } else { +logInfo("No truststore location set for SSL.") + } + + val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD) + if (truststorePassword.nonEmpty) { +adminClientProperties.put("ssl.truststore.password", truststorePassword.get) + } else { +logInfo("No truststore password set for SSL.") + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider it's security impact.") +} + +// There are multiple possibilities to log in: +// - Keytab is provided -> try to log in with kerberos module using kafka's dynamic JAAS +// configuration. +// - Keytab not provided -> try to log in with JVM global security configuration +// which can be configured for example with 'java.security.auth.login.config'. +// For this no additional parameter needed. +KafkaSecurityHelper.getKeytabJaasParams(sparkConf) match { --- End diff -- Fixed. ---
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354365 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354435 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) +val tokenInfo = token.tokenInfo +log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + + def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = { +val adminClientProperties = new Properties + +val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) +require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + +val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) +if (protocol.endsWith("SSL")) { + logInfo("SSL protocol detected.") + + val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION) + if (truststoreLocation.nonEmpty) { +adminClientProperties.put("ssl.truststore.location", truststoreLocation.get) + } else { +logInfo("No truststore location set for SSL.") + } + + val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD) + if (truststorePassword.nonEmpty) { +adminClientProperties.put("ssl.truststore.password", truststorePassword.get) + } else { +logInfo("No truststore password set for SSL.") + } +} else { + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + +"consider it's security impact.") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354231 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.text.SimpleDateFormat +import java.util.Properties + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object TokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { +override def getKind: Text = TOKEN_KIND; + } + + private def printToken(token: DelegationToken): Unit = { +val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm") +log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354199 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala --- @@ -41,10 +41,10 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage * @param schema The schema of the input data. */ class KafkaStreamingWriteSupport( -topic: Option[String], producerParams: Map[String, String], schema: StructType) +topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22598#discussion_r223354097 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends Logging { this } +def setTokenJaasConfigIfNeeded(): ConfigUpdater = { + // There are multiple possibilities to log in: + // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS + // configuration. + // - Token not provided -> try to log in with JVM global security configuration + // which can be configured for example with 'java.security.auth.login.config'. + // For this no additional parameter needed. + KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match { +case Some(jaasParams) => + logInfo("Delegation token detected, using it for login.") + val mechanism = kafkaParams +.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM) + require(mechanism.startsWith("SCRAM"), +"Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) +case None => // No params required + logInfo("Delegation token not found.") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org