[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22598


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237662539
  
--- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Kafka {
+
+  private[spark] val BOOTSTRAP_SERVERS =
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237659299
  
--- Diff: core/src/main/scala/org/apache/spark/internal/config/Kafka.scala 
---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Kafka {
+
+  private[spark] val BOOTSTRAP_SERVERS =
--- End diff --

modifiers are now redundant since this is an object (and not a package 
object).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656411
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  override def afterEach(): Unit = {
+try {
+  resetUGI
+} finally {
+  super.afterEach()
+}
+  }
+
+  private def addTokenToUGI(): Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656366
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import java.text.SimpleDateFormat
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.JaasContext
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
+val adminClientProperties = new ju.Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in and applied in the 
following order:
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237656297
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -688,4 +688,65 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
--- End diff --

Moved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237620523
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import java.text.SimpleDateFormat
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.JaasContext
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
ju.Properties = {
+val adminClientProperties = new ju.Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in and applied in the 
following order:
--- End diff --

Mention the Kafka bug here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237620333
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -688,4 +688,65 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
--- End diff --

This pattern was added after you PR, but since it's there... might be 
better to put these into a new file (e.g. `Kafka.scala`) in this package, 
instead of adding more stuff to this file. (e.g. see `History.scala`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237622443
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  override def afterEach(): Unit = {
+try {
+  resetUGI
+} finally {
+  super.afterEach()
+}
+  }
+
+  private def addTokenToUGI(): Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
--- End diff --

add `()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237495490
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Now the following order applies:
* Global JVM configuration which is kafka specific (kafka looks for 
KafkaClient entry)
  This can be configured many ways not just 
`java.security.auth.login.config` but the mentioned 
`JaasContext.loadClientContext` handles them.
* Keytab
* Ticket cache
I've described this in the code as well to make the intention clear.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-29 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237492537
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Added + tested the fallback mechanism.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237234310
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Kafka has it's public stuff to check for client config: 
`JaasContext.loadClientContext`. It throws exception when no config provided. 
This part works, testing the others...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237206264
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

+ non-normative list of [what I think are the relevant JVM 
Settings](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/KDiag.java#L88)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-28 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r237203744
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

"java.security.auth.login.config"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236738629
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

If you want to keep the ability to provide a custom JAAS config, then just 
check for that. Isn't there a system property you need to set?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613589
  
--- Diff: pom.xml ---
@@ -128,6 +128,7 @@
 1.2.1.spark2
 
 1.2.1
+2.1.0
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613501
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
+UserGroupInformation.setLoginUser(null)
+  }
+
+  test("getTokenJaasParams without token should return None") {
+val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+assert(!jaasParams.isDefined)
+  }
+
+  test("getTokenJaasParams with token no service should throw exception") {
+try {
+  addTokenToUGI
+
+  val thrown = intercept[IllegalArgumentException] {
+KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+  }
+
+  assert(thrown.getMessage contains "Kerberos service name must be 
defined")
+} finally {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613436
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-27 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236613351
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

The jira is: https://issues.apache.org/jira/browse/KAFKA-7677
The guys will take a look at it...

Related `useTgtConfig` I've considered this and would make the user 
experience way much better. On the other hand this would close the possibility 
to use custom JAAS configuration (don't know how many people use this). I'm 
fine with that but one has to count with this consequence. Should we do this 
then?


---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236441225
  
--- Diff: pom.xml ---
@@ -128,6 +128,7 @@
 1.2.1.spark2
 
 1.2.1
+2.1.0
--- End diff --

Since you're adding this here, should probably remove the declaration from 
the kafka modules' pom files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236438713
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import 
org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, 
SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+protocol match {
+  case SASL_SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+
+  case SSL.name =>
+setTrustStoreProperties(sparkConf, adminClientProperties)
+setKeyStoreProperties(sparkConf, adminClientProperties)
+logWarning("Obtaining kafka delegation token with SSL protocol. 
Please " +
+  "configure 2-way authentication on the broker side.")
+
+  case SASL_PLAINTEXT.name =>
+logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+  "consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

So, currently there's no way to use the existing login. Did you file a bug 
for Kafka to implement that? That bug should be referenced here.

Also, what about the existing TGT case? You mentioned you need a different 
JAAS config for that, but I don't see that handled here? Users shouldn't need 
to manually provide that. They don't for any other service supported by this 
framework.

So this should be `if (keytab.isDefined) useKeytabConfig else useTgtConfig`.


---


[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236440232
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
+val token = new Token[KafkaDelegationTokenIdentifier](
+  tokenId.getBytes,
+  tokenPassword.getBytes,
+  KafkaTokenUtil.TOKEN_KIND,
+  KafkaTokenUtil.TOKEN_SERVICE
+)
+val creds = new Credentials()
+creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+UserGroupInformation.getCurrentUser.addCredentials(creds)
+  }
+
+  private def resetUGI: Unit = {
+UserGroupInformation.setLoginUser(null)
+  }
+
+  test("getTokenJaasParams without token should return None") {
+val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+assert(!jaasParams.isDefined)
+  }
+
+  test("getTokenJaasParams with token no service should throw exception") {
+try {
+  addTokenToUGI
+
+  val thrown = intercept[IllegalArgumentException] {
+KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+  }
+
+  assert(thrown.getMessage contains "Kerberos service name must be 
defined")
+} finally {
--- End diff --

Better to use an `after` block instead of having `try...finally` in every 
test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r236439851
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import 
org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME}
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  private def addTokenToUGI: Unit = {
--- End diff --

We use `()` for methods that do more than just returning some value with 
minimal processing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-22 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r235779789
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

Yeah, SSL 2-way authentication was not covered though it's not that hard to 
add.
Using the enum is definitely worth but not found until you've linked here. 
I've adapted `KafkaDelegationTokenProvider` and `KafkaTokenUtil` to use them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-21 Thread soenkeliebau
Github user soenkeliebau commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r235395152
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

Looking at the possible protocol 
[values](https://github.com/apache/kafka/blob/068ab9cefae301f3187ea885d645c425955e77d2/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L26)
 this condition misses the value "SSL" unless I am mistaken. If the user 
presents a client certificate during the SSL handshake for a SSL connection he 
is allowed to obtain delegation tokens as well. If a client cert was not 
provided then Kafka will reject the token request, but that can should be 
covered in error handling I guess.
Would it maybe be an option to move this check into the KafkaTokenUtil 
class and compare with the actual values of the SecurityProtocol enum instead 
of relying on hard coded Strings here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234543403
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

I've spoken with them and confirmed the same thing which is stated in the 
official API documentation: 
https://kafka.apache.org/documentation/#security_sasl_kerberos
There is no such possibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234043117
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

OK, speaking with them...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234040048
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

Where does that exception come from? The message by itself doesn't help.

And that still does not answer my question. The fact it's even talking 
about JAAS configs tells me that it's still trying to log in. My question is 
whether there's an API in Kafka that means "just connect to the server with my 
existing credentials".

At this point in the code, you are already logged in. You already have 
credentials. Kafka should not be forcing you to go through all this again.

It may be that the answer is "Kafka doesn't have this API" in which case a 
bug in Kafka should be filed, and we may have to live with this until that 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r234037129
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

I've just double checked my knowledge and this case kafka looks for JAAS 
entry but will not find it so it throws this exception:
```
Caused by: java.lang.IllegalArgumentException: Could not find a 
'KafkaClient' entry in the JAAS configuration. System property 
'java.security.auth.login.config' is not set
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233934316
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233931731
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233898001
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

This code doesn't try to log in but provides dynamic jaas configuration to 
kafka. In the original implementation only jaas was possible now I've extended 
it to make things easier for the user.

> why is that necessary?

This eliminates the need to provide file based jaas configuration.
Old way: https://github.com/gaborgsomogyi/spark-structured-secure-kafka-app
New way: 
https://github.com/gaborgsomogyi/spark-structured-secure-dt-kafka-app

> won't this fail if the user only has a TGT (i.e. login via kinit)?

It works. TGT can be provided to kafka by providing a file based jaas file 
with 

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233888412
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233871529
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

you are right: the user shouldn't need to be logging in again, as normal 
end-user use is to be kinited in, rather than giving a keytab


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233870838
  
--- Diff: core/pom.xml ---
@@ -408,6 +408,19 @@
   provided
 
 
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233870430
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

Although in theory we could fix up MR, distp, spark etc to say "always ask 
for DTs", it may just encourage people to run with Kerberos off, which is never 
something they should be doing. I don't want to do that & am not actively 
playing with this approach. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858832
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858781
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858643
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858735
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858687
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233858574
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.language.existentials
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  logDebug("Attempting to fetch Kafka security token.")
+  val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
+  creds.addToken(token.getService, token)
+  return Some(nextRenewalDate)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233857377
  
--- Diff: core/pom.xml ---
@@ -408,6 +408,19 @@
   provided
 
 
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233630560
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStorePassword)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+assert(adminClientProperties.get("ssl.truststore.location") === 
trustStoreLocation)
+assert(adminClientProperties.get("ssl.truststore.password") === 
trustStorePassword)
+  }
+
+  test("createAdminClientProperties without keytab should not set dynamic 
jaas config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, sslSecurityProtocol)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === sslSecurityProtocol)
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233629662
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
--- End diff --

s/take over/include


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233628422
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
--- End diff --

debug?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233629692
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala 
---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  KafkaTokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  === bootStrapServers)
+
assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+  === plainSecurityProtocol)
+assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+  }
+
+  test("createAdminClientProperties with SSL protocol should take over 
truststore config") {
--- End diff --

s/take over/include


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233627918
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
--- End diff --

`private[spark]` is redundant when the class is already tagged that way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233627534
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.language.existentials
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  logDebug("Attempting to fetch Kafka security token.")
+  val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
+  creds.addToken(token.getService, token)
+  return Some(nextRenewalDate)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
--- End diff --

`sparkConf.contains(KAFKA_BOOTSTRAP_SERVERS)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233637694
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: 
TokenIdentifier], Long) = {
+val adminClient = 
AdminClient.create(createAdminClientProperties(sparkConf))
+val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+val createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions)
+val token = createResult.delegationToken().get()
+printToken(token)
+
+(new Token[KafkaDelegationTokenIdentifier](
+  token.tokenInfo.tokenId.getBytes,
+  token.hmacAsBase64String.getBytes,
+  TOKEN_KIND,
+  TOKEN_SERVICE
+), token.tokenInfo.expiryTimestamp)
+  }
+
+  private[security] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
--- End diff --

If I understand this correctly, this code is trying to log in again, right? 
Two questions:

- why is that necessary?
- won't this fail if the user only has a TGT (i.e. login via kinit)?

I was expecting that you could reuse the already available login 
information to talk to the Kafka broker here and get delegation tokens. For 
example, you can do this to change the JVM's security context to the current 
user, if it's not already set:

```
org.apache.hadoop.security.UserGroupInformation.getCurrentUser().doAs(
  new java.security.PrivilegedExceptionAction[Unit] { 
override def run(): Unit = {
   

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233626891
  
--- Diff: core/pom.xml ---
@@ -408,6 +408,19 @@
   provided
 
 
+

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r233628054
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+  private[spark] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[spark] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[spark] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
--- End diff --

no semi-colons


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r232033107
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

Using full-reflection would make the code hell complicated but your other 
suggestion about provided scope is really good. This way the copy-paste can be 
avoided. Changed accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-07 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231591033
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

> add another package from where the TokenUtil can be loaded.

Seems like what you're saying is that to support dstreams you'd have two 
pretty much identical TokenUtil classes in different modules? That sounds 
sub-optimal. At that point, it might be better to move the logic in that class 
to core, and just go full-reflection, or even add kafka as a `provided` 
dependency and mimic the Hive provider.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-07 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231458881
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

In general if we're not moving this class into kafka area the only change 
which has to be done is to add another package from where the `TokenUtil` can 
be loaded. Otherwise `KafkaDelegationTokenProvider` has to be copied from 
`spark-sql-kafka...` to `spark-streaming-kafka...`. I think leaving as it is 
makes it more simple.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231252803
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

> Just moves the reflection into HadoopDelegationTokenManager

You can't escape reflection unless you use `java.util.ServiceLoader` here, 
and it was decided not to use that when this code was first written. My 
argument is that it is a lot less reflection - it's just `Class.forName` 
instead of this whole class here.

> Has to be copied when delegation token will be introduced in DStreams
 
And how is that different from what you have here? The code you have won't 
work for dstreams since it's not even available when you pull in just the 
dstream connector.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231126695
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

That will make `KafkaDelegationTokenProvider` more clean but has a couple 
of drawbacks:
 * Just move the reflection into `HadoopDelegationTokenManager`
 * Has to be copied when delegation token will be introduced in DStreams



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231042582
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

Are there news on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-06 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r231041173
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

Checking it...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230683713
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

We've considered turning off by default and came to the conclusion what 
Marcelo described which I think still stands.

The PR says documentation is not covered because design can change. My plan 
is to add it in a separate PR when the feature merged.

Related what to document I think kafka integration guide should cover all 
the things. There it's already covered that the jar should be on the path.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230680318
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

This condition means
`if (bootstrap servers configured and (protocol == SASL_PLAINTEXT or 
protocol == SASL_SSL) )`
Why do you think ssl not covered?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-11-05 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r230679147
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
--- End diff --

That's a nice catch, fixing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-30 Thread soenkeliebau
Github user soenkeliebau commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r229237022
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
--- End diff --

You are right, missed that. Sorry


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228681974
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

yes, I think the best we can do is to document the configs and throw some 
useful error messages to make the user aware of the "bootstrapservers" config 
(in case they accidently left it) when the spark-sql-kafka libraries are not in 
the classpath.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228674142
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

>  if this config is set the spark-sql-kafka libraries needs to be in the 
class path as well

That's actually an argument against disabling it by default. If you set 
that config and don't have the libs in the classpath, you should either:

- get nothing (e.g. current HBase behavior)
- get an error because the libraries are not present

But disabling it by default just means you'd have 3 different things to do 
to enable this, instead of two.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228671263
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

Why I thought disabling by default might make sense - 

The tokens fetch would be attempted if just "spark.kafka.bootstrap.servers" 
is defined. And if this config is set the spark-sql-kafka libraries needs to be 
in the class path as well. Better mention these in the docs. 

We could also consider prefixing all the configs with 
spark.security.credentials.kafka instead of spark.kafka (like 
spark.security.credentials.kafka.bootstrap.servers) to make it explicit that 
these are security related settings required for fetching kafka delegation 
tokens.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228667186
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
--- End diff --

Another alternative that may be cleaner than this is to actually have this 
class in the spark-sql-kafka module, and instantiate it in 
`HadoopDelegationTokenManager` using `Utils.classForName(...).newInstance()`.

Then this particular code does not need to use reflection at all, and you 
may be able to clean some code up on the `TokenUtils` side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228666433
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
--- End diff --

If Kafka tokens expose that information it should be returned here. (HBase 
tokens, at least, don't...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228665965
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

It shouldn't be disabled by default.

I think it's better to just remove the list of provides from the scaladoc. 
That's not really helpful in any case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228583252
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
--- End diff --

spark-core does not have a dependency on spark-sql-kafka so this is needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-26 Thread soenkeliebau
Github user soenkeliebau commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228576259
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
--- End diff --

Is there a specific reason to dynamically load this class? This code looks 
quite similar to 
[HBaseDelegationTokenProvider](https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala#L41),
 but I think there it was done to avoid creating a hard dependency on the HBase 
libs of which TokenUtil is a part. In this case the (Kafka)TokenUtil is within 
the Spark project, so we could just use it directly, could we not?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228339774
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
--- End diff --

Shouldn't this return the time of the next renewal? Otherwise how does the 
token manager know when should it be renewed or recreated ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228321793
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 ---
@@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
   private def getDelegationTokenProviders: Map[String, 
HadoopDelegationTokenProvider] = {
 val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) 
++
   safeCreateProvider(new HiveDelegationTokenProvider) ++
-  safeCreateProvider(new HBaseDelegationTokenProvider)
+  safeCreateProvider(new HBaseDelegationTokenProvider) ++
+  safeCreateProvider(new KafkaDelegationTokenProvider)
--- End diff --

Update the class docs of which providers are loaded by default or better 
set the default for `spark.security.credentials.kafka.enabled` to false. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228320944
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

Given the defaults the tokens fetch would be attempted if only 
`spark.kafka.bootstrap.servers` is defined right ? And the spark-sql-kafka 
libraries needs to be in the class path as well ? 

Better mention these in the docs. And make the 
`spark.security.credentials.kafka.enabled` default to false if it makes sense.

Also consider prefixing all the configs with 
`spark.security.credentials.kafka` instead of `spark.kafka`  (like 
`spark.security.credentials.kafka.bootstrap.servers`) to make it explicit that 
these are security related settings required for fetching kafka delegation 
tokens.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-25 Thread soenkeliebau
Github user soenkeliebau commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r228131806
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
+  sparkConf: SparkConf,
+  hadoopConf: Configuration): Boolean = {
+sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
+  sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
--- End diff --

Was this a conscious decision to not use delegation tokens when using ssl 
authentication? That might also be useful to avoid spreading keys all over the 
cluster, even if load on the kdc is not an issue in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226659261
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)
   

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-19 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226568611
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
--- End diff --

The possibility is there but doesn't throw exception.
Input: `dateFormat.format(-1)`
Output: `1970-01-01T00:59`
It will end up in invalid token which can be found out from log printouts.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226235800
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.{KAFKA_KERBEROS_SERVICE_NAME, 
KEYTAB, PRINCIPAL}
+import 
org.apache.spark.sql.kafka010.TokenUtil.KafkaDelegationTokenIdentifier
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+  private val tokenId = "tokenId" + UUID.randomUUID().toString
+  private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
--- End diff --

If you are playing with UGI in tests, its usually safest to 
call`UserGroupInformation.reset()` during setup and teardown; this empties out 
all existing creds and avoids problems with >1 test in the same JVM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226235157
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
--- End diff --

are these always going to be valid? I.e. > 0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-18 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r226234330
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
+  hadoopConf: Configuration,
+  sparkConf: SparkConf,
+  creds: Credentials): Option[Long] = {
+try {
+  val mirror = 
universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+  val obtainToken = mirror.classLoader.
+loadClass("org.apache.spark.sql.kafka010.TokenUtil").
+getMethod("obtainToken", classOf[SparkConf])
+
+  logDebug("Attempting to fetch Kafka security token.")
+  val token = obtainToken.invoke(null, sparkConf)
+.asInstanceOf[Token[_ <: TokenIdentifier]]
+  creds.addToken(token.getService, token)
+} catch {
+  case NonFatal(e) =>
+logInfo(s"Failed to get token from service $serviceName", e)
+}
+
+None
+  }
+
+  override def delegationTokensRequired(
--- End diff --

OK: so this asks for DTs even if UGI says the cluster is insecure? 

Nothing wrong with that...I've been wondering what would happen if 
`HadoopFSDelegationTokenProvider` did the same thing: asked filesystems for 
their tokens even if in an insecure cluster, as it would let DT support in 
object stores (HADOOP-14556...) work without kerberos.

I'd test to make sure that everything gets through OK. AFAIK YARN is happy 
to pass round credentials in an insecure cluster (it get the AM/RM token to the 
AM this way); its more a matter of making sure the launcher chain is all ready 
fo it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225752604
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225111947
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)
   

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225111758
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)
   

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-15 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r225109193
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

It exists and described in the SPIP: 
`spark.security.credentials.kafka.enabled`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224320793
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{KAFKA_DELEGATION_TOKEN_ENABLED, 
KAFKA_SECURITY_PROTOCOL}
+import org.apache.spark.util.Utils
+
+private[security] class KafkaDelegationTokenProvider
+  extends HadoopDelegationTokenProvider with Logging {
+
+  override def serviceName: String = "kafka"
+
+  override def obtainDelegationTokens(
--- End diff --

Having an utility trait or utility singleton object could reduce the 
overkill, but personally I'd be OK on allowing 5~10 lines of duplication. If we 
are likely to leverage Scala reflection other than catalyst continuously 
(HBaseDelegationTokenProvider does it for two times), we could have utility 
class for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224323537
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

And I would rather say it should be a flag to enable/disable on delegation 
token. Not all end users who use secured Kafka cluster want to leverage 
delegation token.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224334764
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224322849
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -647,4 +647,42 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val KAFKA_DELEGATION_TOKEN_ENABLED =
+ConfigBuilder("spark.kafka.delegation.token.enabled")
+  .doc("Set to 'true' for obtaining delegation token from kafka.")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val KAFKA_BOOTSTRAP_SERVERS =
+ConfigBuilder("spark.kafka.bootstrap.servers")
--- End diff --

While it is not possible to provide relevant configuration to source/sink, 
pre-defining Kafka related configurations one-by-one in here feels me as being 
too coupled with Kafka.

It might also give confusion on where to put configuration on Kafka 
source/sink: this configuration must be only used for delegation token, but I 
can't indicate it from both configuration name as well as its doc.

My 2 cents is just reserving prefix `spark.kafka.token` or similar, and 
leave a comment and don't define anything here. Would like to hear how 
committers think about how to add external configurations on Spark conf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r224338353
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  private[kafka010] val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  private[kafka010] val TOKEN_SERVICE = new 
Text("kafka.server.delegation.token")
+
+  private[kafka010] class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+if (log.isDebugEnabled) {
+  val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+  logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+  val tokenInfo = token.tokenInfo
+  logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+tokenInfo.tokenId,
+tokenInfo.owner,
+tokenInfo.renewersAsString,
+dateFormat.format(tokenInfo.issueTimestamp),
+dateFormat.format(tokenInfo.expiryTimestamp),
+dateFormat.format(tokenInfo.maxTimestamp)))
+}
+  }
+
+  private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): 
Properties = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+  sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { 
truststoreLocation =>
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation)
+  }
+  sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { 
truststorePassword =>
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword)
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider the security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf).foreach { 
jaasParams =>
+  logInfo("Keytab detected, using it for login.")
+  adminClientProperties.put(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.GSSAPI_MECHANISM)

[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223987644
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  TokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
TokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  .equals(bootStrapServers))
--- End diff --

No particular reason, changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223987456
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+val keytab = sparkConf.get(KEYTAB)
+if (keytab.isDefined) {
+  val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+  require(serviceName.nonEmpty, "Kerberos service name must be 
defined")
+  val principal = sparkConf.get(PRINCIPAL)
+  require(principal.nonEmpty, "Principal must be defined")
+
+  val params =
+s"""
+|${getKrb5LoginModuleName} required
+| useKeyTab=true
+| serviceName="${serviceName.get}"
+| keyTab="${keytab.get}"
+| principal="${principal.get}";
+""".stripMargin.replace("\n", "")
+  logDebug(s"Krb JAAS params: $params")
+  Some(params)
+} else {
+  None
+}
+  }
+
+  private def getKrb5LoginModuleName(): String = {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-09 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223625059
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/TokenUtilSuite.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class TokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+  private val bootStrapServers = "127.0.0.1:0"
+  private val plainSecurityProtocol = "SASL_PLAINTEXT"
+  private val sslSecurityProtocol = "SASL_SSL"
+  private val trustStoreLocation = "/path/to/trustStore"
+  private val trustStorePassword = "secret"
+  private val keytab = "/path/to/keytab"
+  private val kerberosServiceName = "kafka"
+  private val principal = "u...@domain.com"
+
+  private var sparkConf: SparkConf = null
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+sparkConf = new SparkConf()
+  }
+
+  test("createAdminClientProperties without bootstrap servers should throw 
exception") {
+val thrown = intercept[IllegalArgumentException] {
+  TokenUtil.createAdminClientProperties(sparkConf)
+}
+assert(thrown.getMessage contains
+  "Tried to obtain kafka delegation token but bootstrap servers not 
configured.")
+  }
+
+  test("createAdminClientProperties without SSL protocol should not take 
over truststore config") {
+sparkConf.set(KAFKA_BOOTSTRAP_SERVERS, bootStrapServers)
+sparkConf.set(KAFKA_SECURITY_PROTOCOL, plainSecurityProtocol)
+sparkConf.set(KAFKA_TRUSTSTORE_LOCATION, trustStoreLocation)
+sparkConf.set(KAFKA_TRUSTSTORE_PASSWORD, trustStoreLocation)
+
+val adminClientProperties = 
TokenUtil.createAdminClientProperties(sparkConf)
+
+
assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+  .equals(bootStrapServers))
--- End diff --

why use of .equals() over scalatest's `===` operator? That one includes the 
values in the assertion raised


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-09 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223623927
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+  def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
+val keytab = sparkConf.get(KEYTAB)
+if (keytab.isDefined) {
+  val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
+  require(serviceName.nonEmpty, "Kerberos service name must be 
defined")
+  val principal = sparkConf.get(PRINCIPAL)
+  require(principal.nonEmpty, "Principal must be defined")
+
+  val params =
+s"""
+|${getKrb5LoginModuleName} required
+| useKeyTab=true
+| serviceName="${serviceName.get}"
+| keyTab="${keytab.get}"
+| principal="${principal.get}";
+""".stripMargin.replace("\n", "")
+  logDebug(s"Krb JAAS params: $params")
+  Some(params)
+} else {
+  None
+}
+  }
+
+  private def getKrb5LoginModuleName(): String = {
--- End diff --

+ add a comment pointing at hadoop UserGroupInformation so that if someone 
ever needs to maintain it, they'll know where to look


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354399
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation.get)
+  } else {
+logInfo("No truststore location set for SSL.")
+  }
+
+  val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD)
+  if (truststorePassword.nonEmpty) {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354478
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation.get)
+  } else {
+logInfo("No truststore location set for SSL.")
+  }
+
+  val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD)
+  if (truststorePassword.nonEmpty) {
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword.get)
+  } else {
+logInfo("No truststore password set for SSL.")
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider it's security impact.")
+}
+
+// There are multiple possibilities to log in:
+// - Keytab is provided -> try to log in with kerberos module using 
kafka's dynamic JAAS
+//   configuration.
+// - Keytab not provided -> try to log in with JVM global security 
configuration
+//   which can be configured for example with 
'java.security.auth.login.config'.
+//   For this no additional parameter needed.
+KafkaSecurityHelper.getKeytabJaasParams(sparkConf) match {
--- End diff --

Fixed.


---


[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354365
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354435
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+  "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", 
"MAXDATE"))
+val tokenInfo = token.tokenInfo
+log.info("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+  tokenInfo.tokenId,
+  tokenInfo.owner,
+  tokenInfo.renewersAsString,
+  dateFormat.format(tokenInfo.issueTimestamp),
+  dateFormat.format(tokenInfo.expiryTimestamp),
+  dateFormat.format(tokenInfo.maxTimestamp)))
+  }
+
+  def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
+val adminClientProperties = new Properties
+
+val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
+require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation 
token but bootstrap " +
+  "servers not configured.")
+
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers.get)
+
+val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
+
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
protocol)
+if (protocol.endsWith("SSL")) {
+  logInfo("SSL protocol detected.")
+
+  val truststoreLocation = sparkConf.get(KAFKA_TRUSTSTORE_LOCATION)
+  if (truststoreLocation.nonEmpty) {
+adminClientProperties.put("ssl.truststore.location", 
truststoreLocation.get)
+  } else {
+logInfo("No truststore location set for SSL.")
+  }
+
+  val truststorePassword = sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD)
+  if (truststorePassword.nonEmpty) {
+adminClientProperties.put("ssl.truststore.password", 
truststorePassword.get)
+  } else {
+logInfo("No truststore password set for SSL.")
+  }
+} else {
+  logWarning("Obtaining kafka delegation token through plain 
communication channel. Please " +
+"consider it's security impact.")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354231
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.text.SimpleDateFormat
+import java.util.Properties
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, 
CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object TokenUtil extends Logging {
+  val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+  val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+  private class KafkaDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+override def getKind: Text = TOKEN_KIND;
+  }
+
+  private def printToken(token: DelegationToken): Unit = {
+val dateFormat = new SimpleDateFormat("-MM-dd'T'HH:mm")
+log.info("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354199
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
 ---
@@ -41,10 +41,10 @@ case object KafkaWriterCommitMessage extends 
WriterCommitMessage
  * @param schema The schema of the input data.
  */
 class KafkaStreamingWriteSupport(
-topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+topic: Option[String], producerParams: ju.Map[String, Object], schema: 
StructType)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22598: [SPARK-25501][SS] Add kafka delegation token supp...

2018-10-08 Thread gaborgsomogyi
Github user gaborgsomogyi commented on a diff in the pull request:

https://github.com/apache/spark/pull/22598#discussion_r223354097
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -556,29 +549,61 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   this
 }
 
+def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+  // There are multiple possibilities to log in:
+  // - Token is provided -> try to log in with scram module using 
kafka's dynamic JAAS
+  //   configuration.
+  // - Token not provided -> try to log in with JVM global security 
configuration
+  //   which can be configured for example with 
'java.security.auth.login.config'.
+  //   For this no additional parameter needed.
+  KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) match {
+case Some(jaasParams) =>
+  logInfo("Delegation token detected, using it for login.")
+  val mechanism = kafkaParams
+.getOrElse(SaslConfigs.SASL_MECHANISM, 
SaslConfigs.DEFAULT_SASL_MECHANISM)
+  require(mechanism.startsWith("SCRAM"),
+"Delegation token works only with SCRAM mechanism.")
+  set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+case None => // No params required
+  logInfo("Delegation token not found.")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >