Repository: kafka
Updated Branches:
  refs/heads/trunk 8d7492016 -> a7671c7f3


KAFKA-4703: Test with two SASL_SSL listeners with different JAAS contexts

Tests broker with multiple SASL mechanisms with different endpoints for 
different mechanisms. Each endpoint uses its own JAAS context.

Author: Balint Molnar <balintmolna...@gmail.com>

Reviewers: Rajini Sivaram, Ismael Juma

Closes #2506 from baluchicken/KAFKA-4703


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a7671c7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a7671c7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a7671c7f

Branch: refs/heads/trunk
Commit: a7671c7f3723113e716b99471e7be3499fde1b15
Parents: 8d74920
Author: Balint Molnar <balintmolna...@gmail.com>
Authored: Wed May 3 21:08:30 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Wed May 3 21:08:30 2017 +0100

----------------------------------------------------------------------
 .../api/SaslEndToEndAuthorizationTest.scala     |   2 +-
 .../scala/integration/kafka/api/SaslSetup.scala |  47 ++++--
 ...ListenersWithAdditionalJaasContextTest.scala |  47 ++++++
 ...pleListenersWithDefaultJaasContextTest.scala |  37 +++++
 ...tenersWithSameSecurityProtocolBaseTest.scala | 158 +++++++++++++++++++
 ...eListenersWithSameSecurityProtocolTest.scala | 132 ----------------
 .../security/auth/ZkAuthorizationTest.scala     |   2 +-
 .../scala/unit/kafka/utils/JaasTestUtils.scala  |  32 +---
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |   2 +-
 9 files changed, 283 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index dd91627..d4c417c 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.GroupAuthorizationException
-import org.junit.{Before,Test}
+import org.junit.{Before, Test}
 
 import scala.collection.immutable.List
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 29aea61..13ed2e2 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -20,8 +20,10 @@ package kafka.api
 import java.io.File
 import java.util.Properties
 import javax.security.auth.login.Configuration
+
 import kafka.security.minikdc.MiniKdc
 import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -29,12 +31,13 @@ import org.apache.kafka.common.config.SaslConfigs
 
 /*
  * Implements an enumeration for the modes enabled here:
- * zk only, kafka only, both.
+ * zk only, kafka only, both, custom KafkaServer.
  */
 sealed trait SaslSetupMode
 case object ZkSasl extends SaslSetupMode
 case object KafkaSasl extends SaslSetupMode
 case object Both extends SaslSetupMode
+case object CustomKafkaServerSasl extends SaslSetupMode
 
 /*
  * Trait used in SaslTestHarness and EndToEndAuthorizationTest to setup keytab 
and jaas files.
@@ -43,11 +46,13 @@ trait SaslSetup {
   private val workDir = TestUtils.tempDir()
   private val kdcConf = MiniKdc.createConfig
   private var kdc: MiniKdc = null
-  private var serverKeytabFile: Option[File] = null
-  private var clientKeytabFile: Option[File] = null
+  private var serverKeytabFile: Option[File] = None
+  private var clientKeytabFile: Option[File] = None
+  private var jaasContext: Seq[JaasSection] = Seq()
 
   def startSasl(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String],
-                mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = 
JaasTestUtils.KafkaServerContextName) {
+                mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = 
JaasTestUtils.KafkaServerContextName,
+                withDefaultJaasContext: Boolean = true) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
     val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == 
Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
@@ -60,27 +65,39 @@ trait SaslSetup {
       kdc.start()
       kdc.createPrincipal(serverKeytabFile, 
JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
       kdc.createPrincipal(clientKeytabFile, 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName, 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
-    } else {
-      this.clientKeytabFile = None
-      this.serverKeytabFile = None
     }
-    setJaasConfiguration(mode, kafkaServerJaasEntryName, 
kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
+    if (withDefaultJaasContext) {
+      setJaasConfiguration(mode, kafkaServerJaasEntryName, 
kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
+      writeJaasConfigurationToFile()
+    } else
+        setJaasConfiguration(mode, kafkaServerJaasEntryName, 
kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
     if (mode == Both || mode == ZkSasl)
       System.setProperty("zookeeper.authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
   protected def setJaasConfiguration(mode: SaslSetupMode, 
kafkaServerEntryName: String,
                                      kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String]) {
-    val jaasFile = mode match {
-      case ZkSasl => JaasTestUtils.writeZkFile()
-      case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, 
kafkaServerSaslMechanisms,
-        kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
-      case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, 
kafkaServerSaslMechanisms,
-        kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
+    val jaasSection = mode match {
+      case ZkSasl => JaasTestUtils.zkSections
+      case KafkaSasl =>
+        Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, 
kafkaServerSaslMechanisms, serverKeytabFile),
+          JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, 
clientKeytabFile))
+      case CustomKafkaServerSasl => 
Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName,
+        kafkaServerSaslMechanisms, serverKeytabFile))
+      case Both => Seq(JaasTestUtils.kafkaServerSection(kafkaServerEntryName, 
kafkaServerSaslMechanisms, serverKeytabFile),
+        JaasTestUtils.kafkaClientSection(kafkaClientSaslMechanism, 
clientKeytabFile)) ++ JaasTestUtils.zkSections
     }
+    jaasContext = jaasContext ++ jaasSection
+  }
+
+  protected def writeJaasConfigurationToFile() {
     // This will cause a reload of the Configuration singleton when 
`getConfiguration` is called
     Configuration.setConfiguration(null)
-    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
+    System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, 
JaasTestUtils.writeJaasContextsToFile(jaasContext))
+  }
+
+  protected def removeJaasSection(context: String) {
+    jaasContext = jaasContext.filter(_.contextName != context)
   }
 
   def closeSasl() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
new file mode 100644
index 0000000..3251be0
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithAdditionalJaasContextTest.scala
@@ -0,0 +1,47 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.Properties
+
+import kafka.api.CustomKafkaServerSasl
+import org.apache.kafka.common.network.ListenerName
+
+
+class MultipleListenersWithAdditionalJaasContextTest extends 
MultipleListenersWithSameSecurityProtocolBaseTest{
+
+  import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+  override def setSaslProperties(listenerName: ListenerName): 
Option[Properties] = {
+
+    val gssapiSaslProperties = kafkaClientSaslProperties(GssApi, 
dynamicJaasConfig = true)
+    val plainSaslProperties = kafkaClientSaslProperties(Plain, 
dynamicJaasConfig = true)
+
+    listenerName.value match {
+      case SecureInternal => Some(plainSaslProperties)
+      case SecureExternal => Some(gssapiSaslProperties)
+      case _ => None
+    }
+  }
+
+  override def addJaasSection(): Unit = {
+    setJaasConfiguration(CustomKafkaServerSasl, "secure_external.KafkaServer", 
List(GssApi), None)
+    setJaasConfiguration(CustomKafkaServerSasl, "secure_internal.KafkaServer", 
List(Plain), None)
+    removeJaasSection("KafkaServer")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
new file mode 100644
index 0000000..8291d82
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithDefaultJaasContextTest.scala
@@ -0,0 +1,37 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.Properties
+
+import org.apache.kafka.common.network.ListenerName
+
+
+class MultipleListenersWithDefaultJaasContextTest extends 
MultipleListenersWithSameSecurityProtocolBaseTest {
+
+  import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+  override def setSaslProperties(listenerName: ListenerName): 
Option[Properties] = {
+    val plainSaslProperties = kafkaClientSaslProperties(Plain, 
dynamicJaasConfig = true)
+
+    listenerName.value match {
+      case SecureExternal | SecureInternal => Some(plainSaslProperties)
+      case _ => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
new file mode 100644
index 0000000..9765279
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -0,0 +1,158 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.File
+import java.util.{Collections, Properties}
+import java.util.concurrent.TimeUnit
+
+import kafka.api.{Both, SaslSetup}
+import kafka.common.Topic
+import kafka.coordinator.group.OffsetConfig
+import kafka.utils.{CoreUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+object MultipleListenersWithSameSecurityProtocolBaseTest {
+  val SecureInternal = "SECURE_INTERNAL"
+  val SecureExternal = "SECURE_EXTERNAL"
+  val Internal = "INTERNAL"
+  val External = "EXTERNAL"
+  val GssApi = "GSSAPI"
+  val Plain = "PLAIN"
+}
+
+abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends 
ZooKeeperTestHarness with SaslSetup{
+
+  import MultipleListenersWithSameSecurityProtocolBaseTest._
+
+  private val trustStoreFile = File.createTempFile("truststore", ".jks")
+  private val servers = new ArrayBuffer[KafkaServer]
+  private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], 
Array[Byte]]]()
+  private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], 
Array[Byte]]]()
+  private val kafkaClientSaslMechanism = Plain
+  private val kafkaServerSaslMechanisms = List(GssApi, Plain)
+
+  protected def setSaslProperties(listenerName: ListenerName): 
Option[Properties]
+  protected def addJaasSection(): Unit = {}
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, 
withDefaultJaasContext = false)
+    addJaasSection()
+    writeJaasConfigurationToFile()
+    super.setUp()
+    // 2 brokers so that we can test that the data propagates correctly via 
UpdateMetadadaRequest
+    val numServers = 2
+
+    (0 until numServers).foreach { brokerId =>
+
+      val props = TestUtils.createBrokerConfig(brokerId, zkConnect, 
trustStoreFile = Some(trustStoreFile))
+      // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
+      props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, 
$Internal://localhost:0, " +
+        s"$SecureExternal://localhost:0, $External://localhost:0")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
+        s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
+      props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
+      props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+      props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
kafkaClientSaslMechanism)
+      props.put(KafkaConfig.SaslEnabledMechanismsProp, 
kafkaServerSaslMechanisms.mkString(","))
+      props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
+
+      props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, 
Some(trustStoreFile), s"server$brokerId"))
+
+      // set listener-specific configs and set an invalid path for the global 
config to verify that the overrides work
+      Seq(SecureInternal, SecureExternal).foreach { listenerName =>
+        props.put(new ListenerName(listenerName).configPrefix + 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+          props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      }
+      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
+
+      servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+    }
+
+    val serverConfig = servers.head.config
+    assertEquals(4, serverConfig.listeners.size)
+
+    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
+      replicationFactor = 2, servers, 
servers.head.groupCoordinator.offsetsTopicConfigs)
+
+    serverConfig.listeners.foreach { endPoint =>
+      val listenerName = endPoint.listenerName
+
+      TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
+
+      val trustStoreFile =
+        if (endPoint.securityProtocol == SecurityProtocol.SASL_SSL) 
Some(this.trustStoreFile)
+        else None
+
+      val saslProperties = setSaslProperties(listenerName)
+
+      val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
+
+      producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, 
acks = -1,
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile, saslProperties = saslProperties)
+
+      consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, 
groupId = listenerName.value,
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile, saslProperties = saslProperties)
+    }
+  }
+
+  @After
+  override def tearDown() {
+    producers.values.foreach(_.close())
+    consumers.values.foreach(_.close())
+    servers.foreach { s =>
+      s.shutdown()
+      CoreUtils.delete(s.config.logDirs)
+    }
+    super.tearDown()
+  }
+
+  /**
+    * Tests that we can produce and consume to/from all broker-defined 
listeners and security protocols. We produce
+    * with acks=-1 to ensure that replication is also working.
+    */
+  @Test
+  def testProduceConsume(): Unit = {
+    producers.foreach { case (listenerName, producer) =>
+      val producerRecords = (1 to 10).map(i => new 
ProducerRecord(listenerName.value, s"key$i".getBytes,
+        s"value$i".getBytes))
+      producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+
+      val consumer = consumers(listenerName)
+      consumer.subscribe(Collections.singleton(listenerName.value))
+      val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+      TestUtils.waitUntilTrue(() => {
+        records ++= consumer.poll(50).asScala
+        records.size == producerRecords.size
+      }, s"Consumed ${records.size} records until timeout instead of the 
expected ${producerRecords.size} records")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
deleted file mode 100644
index ccc118c..0000000
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import java.io.File
-import java.util.Collections
-import java.util.concurrent.TimeUnit
-
-import kafka.common.Topic
-import kafka.coordinator.group.OffsetConfig
-import kafka.utils.{CoreUtils, TestUtils}
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.config.SslConfigs
-import org.apache.kafka.common.network.{ListenerName, Mode}
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.Assert.assertEquals
-import org.junit.{After, Before, Test}
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConverters._
-
-class MultipleListenersWithSameSecurityProtocolTest extends 
ZooKeeperTestHarness {
-
-  private val trustStoreFile = File.createTempFile("truststore", ".jks")
-  private val servers = new ArrayBuffer[KafkaServer]
-  private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], 
Array[Byte]]]()
-  private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], 
Array[Byte]]]()
-
-  @Before
-  override def setUp(): Unit = {
-    super.setUp()
-    // 2 brokers so that we can test that the data propagates correctly via 
UpdateMetadadaRequest
-    val numServers = 2
-
-    (0 until numServers).foreach { brokerId =>
-
-      val props = TestUtils.createBrokerConfig(brokerId, zkConnect, 
trustStoreFile = Some(trustStoreFile))
-      // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
-      props.put(KafkaConfig.ListenersProp, "SECURE_INTERNAL://localhost:0, 
INTERNAL://localhost:0, " +
-        "SECURE_EXTERNAL://localhost:0, EXTERNAL://localhost:0")
-      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"INTERNAL:PLAINTEXT, SECURE_INTERNAL:SSL," +
-        "EXTERNAL:PLAINTEXT, SECURE_EXTERNAL:SSL")
-      props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
-      props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, 
Some(trustStoreFile), s"server$brokerId"))
-
-      // set listener-specific configs and set an invalid path for the global 
config to verify that the overrides work
-      Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName =>
-        props.put(new ListenerName(listenerName).configPrefix + 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
-          props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
-      }
-      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
-
-      servers += TestUtils.createServer(KafkaConfig.fromProps(props))
-    }
-
-    val serverConfig = servers.head.config
-    assertEquals(4, serverConfig.listeners.size)
-
-    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
-      replicationFactor = 2, servers, 
servers.head.groupCoordinator.offsetsTopicConfigs)
-
-    serverConfig.listeners.foreach { endPoint =>
-      val listenerName = endPoint.listenerName
-
-      TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
-
-      val trustStoreFile =
-        if (endPoint.securityProtocol == SecurityProtocol.SSL) 
Some(this.trustStoreFile)
-        else None
-
-      val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
-
-      producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, 
acks = -1,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile)
-
-      consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, 
groupId = listenerName.value,
-        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile)
-    }
-  }
-
-  @After
-  override def tearDown() {
-    producers.values.foreach(_.close())
-    consumers.values.foreach(_.close())
-    servers.foreach { s =>
-      s.shutdown()
-      CoreUtils.delete(s.config.logDirs)
-    }
-    super.tearDown()
-  }
-
-  /**
-    * Tests that we can produce and consume to/from all broker-defined 
listeners and security protocols. We produce
-    * with acks=-1 to ensure that replication is also working.
-    */
-  @Test
-  def testProduceConsume(): Unit = {
-    producers.foreach { case (listenerName, producer) =>
-      val producerRecords = (1 to 10).map(i => new 
ProducerRecord(listenerName.value, s"key$i".getBytes,
-        s"value$i".getBytes))
-      producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
-
-      val consumer = consumers(listenerName)
-      consumer.subscribe(Collections.singleton(listenerName.value))
-      val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
-      TestUtils.waitUntilTrue(() => {
-        records ++= consumer.poll(50).asScala
-        records.size == producerRecords.size
-      }, s"Consumed ${records.size} records until timeout instead of the 
expected ${producerRecords.size} records")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 3e7fce4..5ea92aa 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -30,7 +30,7 @@ import scala.util.{Try, Success, Failure}
 import javax.security.auth.login.Configuration
 
 class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
-  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile
+  val jaasFile = 
kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
 
   @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 7b90abf..3ae680c 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -82,7 +82,7 @@ object JaasTestUtils {
     }
   }
 
-  class JaasSection(contextName: String,
+  case class JaasSection(contextName: String,
                     jaasModule: Seq[JaasModule]) {
     override def toString: String = {
       s"""|$contextName {
@@ -122,29 +122,9 @@ object JaasTestUtils {
   val KafkaScramAdmin = "scram-admin"
   val KafkaScramAdminPassword = "scram-admin-secret"
 
-  def writeZkFile(): String = {
+  def writeJaasContextsToFile(jaasContexts: Seq[JaasSection]): String = {
     val jaasFile = TestUtils.tempFile()
-    writeToFile(jaasFile, zkSections)
-    jaasFile.getCanonicalPath
-  }
-
-  def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: 
List[String],
-                     kafkaClientSaslMechanism: Option[String], 
serverKeyTabLocation: Option[File],
-                     clientKeyTabLocation: Option[File]): String = {
-    val jaasFile = TestUtils.tempFile()
-    val kafkaSections = Seq(kafkaServerSection(serverEntryName, 
kafkaServerSaslMechanisms, serverKeyTabLocation),
-      kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
-    writeToFile(jaasFile, kafkaSections)
-    jaasFile.getCanonicalPath
-  }
-
-  def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: 
List[String],
-                           kafkaClientSaslMechanism: Option[String], 
serverKeyTabLocation: Option[File],
-                           clientKeyTabLocation: Option[File]): String = {
-    val jaasFile = TestUtils.tempFile()
-    val kafkaSections = Seq(kafkaServerSection(serverEntryName, 
kafkaServerSaslMechanisms, serverKeyTabLocation),
-      kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
-    writeToFile(jaasFile, kafkaSections ++ zkSections)
+    writeToFile(jaasFile,jaasContexts)
     jaasFile.getCanonicalPath
   }
 
@@ -152,12 +132,12 @@ object JaasTestUtils {
   def clientLoginModule(mechanism: String, keytabLocation: Option[File]): 
String =
     kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, 
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
 
-  private def zkSections: Seq[JaasSection] = Seq(
+  def zkSections: Seq[JaasSection] = Seq(
     new JaasSection(ZkServerContextName, Seq(JaasModule(ZkModule, false, 
Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))),
     new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, 
Map("username" -> ZkUser, "password" -> ZkUserPassword))))
   )
 
-  private def kafkaServerSection(contextName: String, mechanisms: 
List[String], keytabLocation: Option[File]): JaasSection = {
+  def kafkaServerSection(contextName: String, mechanisms: List[String], 
keytabLocation: Option[File]): JaasSection = {
     val modules = mechanisms.map {
       case "GSSAPI" =>
         Krb5LoginModule(
@@ -215,7 +195,7 @@ object JaasTestUtils {
   /*
    * Used for the static JAAS configuration and it uses the credentials for 
client#2
    */
-  private def kafkaClientSection(mechanism: Option[String], keytabLocation: 
Option[File]): JaasSection = {
+  def kafkaClientSection(mechanism: Option[String], keytabLocation: 
Option[File]): JaasSection = {
     new JaasSection(KafkaClientContextName, mechanism.map(m =>
       kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, 
KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, 
KafkaScramPassword2)).toSeq)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a7671c7f/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 4d57ed9..c9076b5 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -47,7 +47,7 @@ object ZKEphemeralTest {
 
 @RunWith(value = classOf[Parameterized])
 class ZKEphemeralTest(val secure: Boolean) extends ZooKeeperTestHarness {
-  val jaasFile = kafka.utils.JaasTestUtils.writeZkFile()
+  val jaasFile = 
kafka.utils.JaasTestUtils.writeJaasContextsToFile(kafka.utils.JaasTestUtils.zkSections)
   val authProvider = "zookeeper.authProvider.1"
   var zkSessionTimeoutMs = 1000
   

Reply via email to