Repository: kafka
Updated Branches:
  refs/heads/trunk 503bd3664 -> 9e2c683f5


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c39402c..d547a01 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1,19 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.server
 
@@ -26,14 +26,17 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SSLConfigs
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
 import org.apache.kafka.common.config.ConfigDef.Type._
+
 import org.apache.kafka.common.config.{ConfigException, AbstractConfig, 
ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.PrincipalBuilder
+import scala.collection.{mutable, immutable, JavaConversions, Map}
 
-import scala.collection.{Map, immutable}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -151,6 +154,25 @@ object Defaults {
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 30000
   val MetricReporterClasses = ""
+
+  /** ********* SSL configuration ***********/
+  val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
+  val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL
+  val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS
+  val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+  val SSLKeystoreLocation = "/tmp/ssl.keystore.jks"
+  val SSLKeystorePassword = "keystore_password"
+  val SSLKeyPassword = "key_password"
+  val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+  val SSLTruststoreLocation = SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION
+  val SSLTruststorePassword = SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD
+  val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
+  val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
+  val SSLClientAuthRequired = "required"
+  val SSLClientAuthRequested = "requested"
+  val SSLClientAuthNone = "none"
+  val SSLClientAuth = SSLClientAuthNone
+
 }
 
 object KafkaConfig {
@@ -278,6 +300,25 @@ object KafkaConfig {
   val MetricNumSamplesProp: String = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
 
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
+  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
+  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
+  val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
+  val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
+  val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
+  val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
+  val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
+  val SSLTrustManagerAlgorithmProp = 
SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
+  val SSLEndpointIdentificationAlgorithmProp = 
SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
+  val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
+
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Zookeeper host string"
@@ -287,8 +328,8 @@ object KafkaConfig {
   /** ********* General Configuration ***********/
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. " +
-    "To avoid conflicts between zookeeper generated brokerId and user's 
config.brokerId " +
-    "added MaxReservedBrokerId and zookeeper sequence starts from 
MaxReservedBrokerId + 1."
+  "To avoid conflicts between zookeeper generated brokerId and user's 
config.brokerId " +
+  "added MaxReservedBrokerId and zookeeper sequence starts from 
MaxReservedBrokerId + 1."
   val MessageMaxBytesDoc = "The maximum size of message that the server can 
receive"
   val NumNetworkThreadsDoc = "the number of network threads that the server 
uses for handling network requests"
   val NumIoThreadsDoc = "The number of io threads that the server uses for 
carrying out network requests"
@@ -298,21 +339,21 @@ object KafkaConfig {
   val PortDoc = "the port to listen and accept connections on"
   val HostNameDoc = "hostname of broker. If this is set, it will only bind to 
this address. If this is not set, it will bind to all interfaces"
   val ListenersDoc = "Listener List - Comma-separated list of URIs we will 
listen on and their protocols.\n" +
-          " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
-          " Leave hostname empty to bind to default interface.\n" +
-          " Examples of legal listener lists:\n" +
-          " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
-          " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
+  " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+  " Leave hostname empty to bind to default interface.\n" +
+  " Examples of legal listener lists:\n" +
+  " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
+  " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
   val AdvertisedHostNameDoc = "Hostname to publish to ZooKeeper for clients to 
use. In IaaS environments, this may " +
-    "need to be different from the interface to which the broker binds. If 
this is not set, " +
-    "it will use the value for \"host.name\" if configured. Otherwise " +
-    "it will use the value returned from 
java.net.InetAddress.getCanonicalHostName()."
+  "need to be different from the interface to which the broker binds. If this 
is not set, " +
+  "it will use the value for \"host.name\" if configured. Otherwise " +
+  "it will use the value returned from 
java.net.InetAddress.getCanonicalHostName()."
   val AdvertisedPortDoc = "The port to publish to ZooKeeper for clients to 
use. In IaaS environments, this may " +
-    "need to be different from the port to which the broker binds. If this is 
not set, " +
-    "it will publish the same port that the broker binds to."
+  "need to be different from the port to which the broker binds. If this is 
not set, " +
+  "it will publish the same port that the broker binds to."
   val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients 
to use, if different than the listeners above." +
-          " In IaaS environments, this may need to be different from the 
interface to which the broker binds." +
-          " If this is not set, the value for \"listeners\" will be used."
+  " In IaaS environments, this may need to be different from the interface to 
which the broker binds." +
+  " If this is not set, the value for \"listeners\" will be used."
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever 
sockets"
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever 
sockets"
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket 
request"
@@ -342,7 +383,7 @@ object KafkaConfig {
   val LogCleanerDedupeBufferSizeDoc = "The total memory used for log 
deduplication across all cleaner threads"
   val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O 
buffers across all cleaner threads"
   val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load 
factor. The percentage full the dedupe buffer can become. A higher value " +
-    "will allow more log to be cleaned at once but will lead to more hash 
collisions"
+  "will allow more log to be cleaned at once but will lead to more hash 
collisions"
   val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no 
logs to clean"
   val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total 
log for a log to eligible for cleaning"
   val LogCleanerEnableDoc = "Should we enable log cleaning?"
@@ -363,15 +404,15 @@ object KafkaConfig {
   val ControllerMessageQueueSizeDoc = "The buffer size for 
controller-to-broker-channels"
   val DefaultReplicationFactorDoc = "default replication factors for 
automatically created topics"
   val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests 
or hasn't consumed up to the leaders log end offset for at least this time," +
-    " the leader will remove the follower from isr"
+  " the leader will remove the follower from isr"
   val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. 
Its value should be at least replica.fetch.wait.max.ms"
   val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for 
network requests"
   val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to 
fetch"
   val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request 
issued by follower replicas. This value should always be less than the " +
-    "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR 
for low throughput topics"
+  "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR 
for low throughput topics"
   val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch 
response. If not enough bytes, wait up to replicaMaxWaitTimeMs"
   val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate 
messages from a source broker. " +
-    "Increasing this value can increase the degree of I/O parallelism in the 
follower broker."
+  "Increasing this value can increase the degree of I/O parallelism in the 
follower broker."
   val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch 
partition error occurs."
   val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which 
the high watermark is saved out to disk"
   val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number 
of requests) of the fetch request purgatory"
@@ -382,8 +423,8 @@ object KafkaConfig {
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas 
not in the ISR set to be elected as leader as a last resort, even though doing 
so may result in data loss"
   val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate 
between brokers. Defaults to plain text."
   val InterBrokerProtocolVersionDoc = "Specify which version of the 
inter-broker protocol will be used.\n" +
-          " This is typically bumped after all brokers were upgraded to a new 
version.\n" +
-          " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 
0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list."
+  " This is typically bumped after all brokers were upgraded to a new 
version.\n" +
+  " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 
0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list."
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for 
multiple reasons. This determines the number of retries when such failure 
happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system 
needs time to recover from the state that caused the previous failure 
(Controller fail over, replica lag etc). This config determines the amount of 
time to wait before retrying."
@@ -395,17 +436,17 @@ object KafkaConfig {
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry 
associated with an offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets 
segments when loading offsets into the cache."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the 
offsets topic (set higher to ensure availability). " +
-    "To ensure that the effective replication factor of the offsets topic is 
the configured value, " +
-    "the number of alive brokers has to be at least the replication factor at 
the time of the " +
-    "first request for the offsets topic. If not, either the offsets topic 
creation will fail or " +
-    "it will get a replication factor of min(alive brokers, configured 
replication factor)"
+  "To ensure that the effective replication factor of the offsets topic is the 
configured value, " +
+  "the number of alive brokers has to be at least the replication factor at 
the time of the " +
+  "first request for the offsets topic. If not, either the offsets topic 
creation will fail or " +
+  "it will get a replication factor of min(alive brokers, configured 
replication factor)"
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset 
commit topic (should not change after deployment)"
   val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be 
kept relatively small in order to facilitate faster log compaction and cache 
loads"
   val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets 
topic - compression may be used to achieve \"atomic\" commits"
   val OffsetsRetentionMinutesDoc = "Log retention window in minutes for 
offsets topic"
   val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for 
stale offsets"
   val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all 
replicas for the offsets topic receive the commit " +
-    "or this timeout is reached. This is similar to the producer request 
timeout."
+  "or this timeout is reached. This is similar to the producer request 
timeout."
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can 
be accepted. In general, the default (-1) should not be overridden"
   /** ********* Quota Configuration ***********/
   val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by 
clientId will get throttled if it produces more bytes than this value 
per-second"
@@ -419,14 +460,31 @@ object KafkaConfig {
 
   val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the 
admin tool will have no effect if this config is turned off"
   val CompressionTypeDoc = "Specify the final compression type for a given 
topic. This configuration accepts the standard compression codecs " +
-    "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is 
equivalent to no compression; and " +
-    "'producer' which means retain the original compression codec set by the 
producer."
+  "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is 
equivalent to no compression; and " +
+  "'producer' which means retain the original compression codec set by the 
producer."
 
   /** ********* Kafka Metrics Configuration ***********/
   val MetricSampleWindowMsDoc = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
   val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
   val MetricReporterClassesDoc = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
 
+  /** ********* SSL Configuration ****************/
+  val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC
+  val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC
+  val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC
+  val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC
+  val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC
+  val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC
+  val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC
+  val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC
+  val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC
+  val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC
+  val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
+  val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC
+  val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
+  val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
+  val SSLEndpointIdentificationAlgorithmDoc = 
SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
+  val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC
 
   private val configDef = {
     import ConfigDef.Range._
@@ -561,6 +619,24 @@ object KafkaConfig {
       .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, 
Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, 
ConsumerQuotaBytesPerSecondOverridesDoc)
       .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), 
LOW, NumQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, 
Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
+
+
+      /** ********* SSL Configuration ****************/
+      .define(PrincipalBuilderClassProp, STRING, 
Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
+      .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, 
SSLProtocolDoc)
+      .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false)
+      .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, 
MEDIUM, SSLEnabledProtocolsDoc)
+      .define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, 
SSLKeystoreTypeDoc)
+      .define(SSLKeystoreLocationProp, STRING, Defaults.SSLKeystoreLocation, 
MEDIUM, SSLKeystoreLocationDoc)
+      .define(SSLKeystorePasswordProp, STRING, Defaults.SSLKeystorePassword, 
MEDIUM, SSLKeystorePasswordDoc)
+      .define(SSLKeyPasswordProp, STRING, Defaults.SSLKeyPassword, MEDIUM, 
SSLKeyPasswordDoc)
+      .define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, 
MEDIUM, SSLTruststoreTypeDoc)
+      .define(SSLTruststoreLocationProp, STRING, 
Defaults.SSLTruststoreLocation, MEDIUM, SSLTruststoreLocationDoc)
+      .define(SSLTruststorePasswordProp, STRING, 
Defaults.SSLTruststorePassword, MEDIUM, SSLTruststorePasswordDoc)
+      .define(SSLKeyManagerAlgorithmProp, STRING, 
Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc)
+      .define(SSLTrustManagerAlgorithmProp, STRING, 
Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc)
+      .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, 
in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, 
Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
+
   }
 
   def configNames() = {
@@ -569,8 +645,8 @@ object KafkaConfig {
   }
 
   /**
-   * Check that property names are valid
-   */
+    * Check that property names are valid
+    */
   def validateNames(props: Properties) {
     import scala.collection.JavaConversions._
     val names = configDef.names()
@@ -622,7 +698,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => 
(k, v.toInt)}
   val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
 
-
   /** ********* Log Configuration ***********/
   val autoCreateTopicsEnable = 
getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
   val numPartitions = getInt(KafkaConfig.NumPartitionsProp)
@@ -701,6 +776,22 @@ case class KafkaConfig (props: java.util.Map[_, _]) 
extends AbstractConfig(Kafka
   val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
   val metricReporterClasses: java.util.List[MetricsReporter] = 
getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, 
classOf[MetricsReporter])
 
+  /** ********* SSL Configuration **************/
+  val principalBuilderClass = getString(KafkaConfig.PrincipalBuilderClassProp)
+  val sslProtocol = getString(KafkaConfig.SSLProtocolProp)
+  val sslProvider = getString(KafkaConfig.SSLProviderProp)
+  val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp)
+  val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp)
+  val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp)
+  val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp)
+  val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp)
+  val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp)
+  val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp)
+  val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp)
+  val sslKeyManagerAlgorithm = 
getString(KafkaConfig.SSLKeyManagerAlgorithmProp)
+  val sslTrustManagerAlgorithm = 
getString(KafkaConfig.SSLTrustManagerAlgorithmProp)
+  val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
+
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = 
getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
   val consumerQuotaBytesPerSecondDefault = 
getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@@ -726,8 +817,8 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
           case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * 
millisInHour
         })
 
-      if (millis < 0) return -1
-      millis
+    if (millis < 0) return -1
+    millis
   }
 
   private def getMap(propName: String, propValue: String): Map[String, String] 
= {
@@ -746,10 +837,12 @@ case class KafkaConfig (props: java.util.Map[_, _]) 
extends AbstractConfig(Kafka
     } catch {
       case e: Exception => throw new IllegalArgumentException("Error creating 
broker listeners from '%s': %s".format(listeners, e.getMessage))
     }
-    val distinctPorts = endpoints.map(ep => ep.port).distinct
+    // filter port 0 for unit tests
+    val endpointsWithoutZeroPort = endpoints.map(ep => ep.port).filter(_ != 0)
+    val distinctPorts = endpointsWithoutZeroPort.distinct
     val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct
 
-    require(distinctPorts.size == endpoints.size, "Each listener must have a 
different port")
+    require(distinctPorts.size == endpointsWithoutZeroPort.size, "Each 
listener must have a different port")
     require(distinctProtocols.size == endpoints.size, "Each listener must have 
a different protocol")
   }
 
@@ -796,6 +889,11 @@ case class KafkaConfig (props: java.util.Map[_, _]) 
extends AbstractConfig(Kafka
 
   }
 
+
+  private def getPrincipalBuilderClass(principalBuilderClass: String): 
PrincipalBuilder = {
+    CoreUtils.createObject[PrincipalBuilder](principalBuilderClass)
+  }
+
   validateValues()
 
   private def validateValues() {
@@ -815,4 +913,24 @@ case class KafkaConfig (props: java.util.Map[_, _]) 
extends AbstractConfig(Kafka
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
   }
+
+  def channelConfigs: java.util.Map[String, Object] = {
+    val channelConfigs = new java.util.HashMap[String, Object]()
+    import kafka.server.KafkaConfig._
+    channelConfigs.put(PrincipalBuilderClassProp, 
Class.forName(principalBuilderClass))
+    channelConfigs.put(SSLProtocolProp, sslProtocol)
+    channelConfigs.put(SSLEnabledProtocolsProp, sslEnabledProtocols)
+    channelConfigs.put(SSLKeystoreTypeProp, sslKeystoreType)
+    channelConfigs.put(SSLKeystoreLocationProp, sslKeystoreLocation)
+    channelConfigs.put(SSLKeystorePasswordProp, sslKeystorePassword)
+    channelConfigs.put(SSLKeyPasswordProp, sslKeyPassword)
+    channelConfigs.put(SSLTruststoreTypeProp, sslTruststoreType)
+    channelConfigs.put(SSLTruststoreLocationProp, sslTruststoreLocation)
+    channelConfigs.put(SSLTruststorePasswordProp, sslTruststorePassword)
+    channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm)
+    channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm)
+    channelConfigs.put(SSLClientAuthProp, sslClientAuth)
+    channelConfigs
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index a0e3fdf..0e7ba3e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,6 +30,7 @@ import java.io.File
 import kafka.utils._
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.network.NetworkReceive
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
 
 import scala.collection.mutable
@@ -84,9 +85,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
   private var shutdownLatch = new CountDownLatch(1)
 
-  private val metricConfig: MetricConfig = new MetricConfig()
-          .samples(config.metricNumSamples)
-          .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
   private val jmxPrefix: String = "kafka.server"
   private val reporters: java.util.List[MetricsReporter] =  
config.metricReporterClasses
   reporters.add(new JmxReporter(jmxPrefix))
@@ -97,6 +95,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new 
org.apache.kafka.common.utils.SystemTime()
   var metrics: Metrics = null
 
+  private val metricConfig: MetricConfig = new MetricConfig()
+    .samples(config.metricNumSamples)
+    .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
+
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
@@ -119,7 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   var kafkaHealthcheck: KafkaHealthcheck = null
   val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
 
-
   var zkClient: ZkClient = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
@@ -166,63 +167,52 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
         config.brokerId =  getBrokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
-        socketServer = new SocketServer(config.brokerId,
-                                        config.listeners,
-                                        config.numNetworkThreads,
-                                        config.queuedMaxRequests,
-                                        config.socketSendBufferBytes,
-                                        config.socketReceiveBufferBytes,
-                                        config.socketRequestMaxBytes,
-                                        config.maxConnectionsPerIp,
-                                        config.connectionsMaxIdleMs,
-                                        config.maxConnectionsPerIpOverrides,
-                                        kafkaMetricsTime,
-                                        metrics)
-          socketServer.startup()
-
-          /* start replica manager */
-          replicaManager = new ReplicaManager(config, time, zkClient, 
kafkaScheduler, logManager, isShuttingDown)
-          replicaManager.startup()
-
-          /* start kafka controller */
-          kafkaController = new KafkaController(config, zkClient, brokerState)
-          kafkaController.startup()
-
-          /* start kafka coordinator */
-          consumerCoordinator = ConsumerCoordinator.create(config, zkClient, 
replicaManager, kafkaScheduler)
-          consumerCoordinator.startup()
-
-          /* start processing requests */
-          apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
consumerCoordinator,
-            kafkaController, zkClient, config.brokerId, config, metadataCache, 
metrics)
-          requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
-          brokerState.newState(RunningAsBroker)
-
-          Mx4jLoader.maybeLoad()
-
-          /* start dynamic config manager */
-          dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic 
-> new TopicConfigHandler(logManager),
-                                                             ConfigType.Client 
-> new ClientIdConfigHandler)
-          dynamicConfigManager = new DynamicConfigManager(zkClient, 
dynamicConfigHandlers)
-          dynamicConfigManager.startup()
-
-          /* tell everyone we are alive */
-          val listeners = config.advertisedListeners.map {case(protocol, 
endpoint) =>
-            if (endpoint.port == 0)
-              (protocol, EndPoint(endpoint.host, socketServer.boundPort(), 
endpoint.protocolType))
-            else
-              (protocol, endpoint)
-          }
-          kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, 
config.zkSessionTimeoutMs, zkClient)
-          kafkaHealthcheck.startup()
+        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
+        socketServer.startup()
+
+        /* start replica manager */
+        replicaManager = new ReplicaManager(config, time, zkClient, 
kafkaScheduler, logManager, isShuttingDown)
+        replicaManager.startup()
+
+        /* start kafka controller */
+        kafkaController = new KafkaController(config, zkClient, brokerState)
+        kafkaController.startup()
+
+        /* start kafka coordinator */
+        consumerCoordinator = ConsumerCoordinator.create(config, zkClient, 
replicaManager, kafkaScheduler)
+        consumerCoordinator.startup()
+
+        /* start processing requests */
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
consumerCoordinator,
+          kafkaController, zkClient, config.brokerId, config, metadataCache, 
metrics)
+        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
+        brokerState.newState(RunningAsBroker)
+
+        Mx4jLoader.maybeLoad()
+
+        /* start dynamic config manager */
+        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager),
+                                                           ConfigType.Client 
-> new ClientIdConfigHandler)
+        dynamicConfigManager = new DynamicConfigManager(zkClient, 
dynamicConfigHandlers)
+        dynamicConfigManager.startup()
+
+        /* tell everyone we are alive */
+        val listeners = config.advertisedListeners.map {case(protocol, 
endpoint) =>
+          if (endpoint.port == 0)
+            (protocol, EndPoint(endpoint.host, 
socketServer.boundPort(protocol), endpoint.protocolType))
+          else
+            (protocol, endpoint)
+        }
+        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, 
config.zkSessionTimeoutMs, zkClient)
+        kafkaHealthcheck.startup()
 
-          /* register broker metrics */
-          registerStats()
+        /* register broker metrics */
+        registerStats()
 
-          shutdownLatch = new CountDownLatch(1)
-          startupComplete.set(true)
-          isStartingUp.set(false)
-          info("started")
+        shutdownLatch = new CountDownLatch(1)
+        startupComplete.set(true)
+        isStartingUp.set(false)
+        info("started")
       }
     }
     catch {
@@ -414,7 +404,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
   def getLogManager(): LogManager = logManager
 
-  def boundPort(): Int = socketServer.boundPort()
+  def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int 
= socketServer.boundPort(protocol)
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): 
LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 4b6358c..637d6f3 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -74,7 +74,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
   def testSendOffset() {
     var producer = TestUtils.createNewProducer(brokerList)
     val partition = new Integer(0)
-    
+
     object callback extends Callback {
       var offset = 0L
       def onCompletion(metadata: RecordMetadata, exception: Exception) {
@@ -298,7 +298,7 @@ class ProducerSendTest extends KafkaServerTestHarness {
       }
     }
   }
-  
+
   /**
    * Test that flush immediately sends all accumulated requests.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
new file mode 100644
index 0000000..4281036
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -0,0 +1,251 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import java.util.Properties
+import java.io.File
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
+import kafka.integration.KafkaServerTestHarness
+
+import kafka.utils.{TestUtils, Logging}
+import kafka.server.KafkaConfig
+
+import java.util.ArrayList
+import org.junit.{Test, Before, After}
+import org.junit.Assert._
+
+import scala.collection.mutable.Buffer
+import scala.collection.JavaConversions._
+import kafka.coordinator.ConsumerCoordinator
+
+
+/**
+  * Integration tests for the new consumer that cover basic usage as well as 
server failures
+  */
+class SSLConsumerTest extends KafkaServerTestHarness with Logging {
+
+  val trustStoreFile = File.createTempFile("truststore", ".jks")
+  val numServers = 3
+  val producerCount = 1
+  val consumerCount = 2
+  val producerConfig = new Properties
+  val consumerConfig = new Properties
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "false") // 
speed up shutdown
+  overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // 
don't want to lose offset
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  overridingProps.put(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // 
set small enough session timeout
+
+  val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
enableSSL=true, 
trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, 
overridingProps))
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+
+  // configure the servers and clients
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.getSSLBrokerListStrFromServers(servers))
+    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
+    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.getSSLBrokerListStrFromServers(servers))
+    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
+    consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
"range")
+
+    for (i <- 0 until producerCount)
+      producers += 
TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers),
+                                               acks = 1,
+                                               enableSSL=true,
+                                               
trustStoreFile=Some(trustStoreFile))
+    for (i <- 0 until consumerCount)
+      consumers += 
TestUtils.createNewConsumer(TestUtils.getSSLBrokerListStrFromServers(servers),
+                                               groupId = "my-test",
+                                               partitionAssignmentStrategy= 
"range",
+                                               enableSSL=true,
+                                               
trustStoreFile=Some(trustStoreFile))
+
+
+    // create the consumer offset topic
+    TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
+      
overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
+      
overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
+      servers,
+      servers(0).consumerCoordinator.offsetsTopicConfigs)
+
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(this.zkClient, topic, 1, numServers, this.servers)
+  }
+
+  @After
+  override def tearDown() {
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
+    super.tearDown()
+  }
+
+  @Test
+  def testSimpleConsumption() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+    assertEquals(0, this.consumers(0).subscriptions.size)
+    this.consumers(0).subscribe(tp)
+    assertEquals(1, this.consumers(0).subscriptions.size)
+    this.consumers(0).seek(tp, 0)
+    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset 
= 0)
+  }
+
+  @Test
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).subscribe(tp)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.subscribe(tp)
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
+  }
+
+  @Test
+  def testGroupConsumption() {
+    sendRecords(10)
+    this.consumers(0).subscribe(topic)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  @Test
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an 
exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).subscribe(tp)
+
+    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals(0L, this.consumers(0).committed(tp))
+
+    consumeRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals("Committed offset should be returned", 5L, 
this.consumers(0).committed(tp))
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).subscribe(tp)
+    consumeRecords(this.consumers(1), 1, 5)
+  }
+
+  @Test
+  def testPartitionsFor() {
+    val numParts = 2
+    TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, 
this.servers)
+    val parts = this.consumers(0).partitionsFor("part-test")
+    assertNotNull(parts)
+    assertEquals(2, parts.length)
+    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  private class TestConsumerReassignmentCallback extends 
ConsumerRebalanceCallback {
+    var callsToAssigned = 0
+    var callsToRevoked = 0
+    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: 
java.util.Collection[TopicPartition]) {
+      info("onPartitionsAssigned called.")
+      callsToAssigned += 1
+    }
+    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: 
java.util.Collection[TopicPartition]) {
+      info("onPartitionsRevoked called.")
+      callsToRevoked += 1
+    }
+  }
+
+  private def sendRecords(numRecords: Int) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(topic, part, 
i.toString.getBytes, i.toString.getBytes))
+    }
+    futures.map(_.get)
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], 
numRecords: Int, startingOffset: Int) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50)) {
+        records.add(record)
+      }
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
new file mode 100644
index 0000000..0f70624
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala
@@ -0,0 +1,240 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.io.File
+
+import kafka.consumer.SimpleConsumer
+import kafka.integration.KafkaServerTestHarness
+import kafka.message.Message
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+
+class SSLProducerSendTest extends KafkaServerTestHarness {
+  val numServers = 2
+  val trustStoreFile = File.createTempFile("truststore", ".jks")
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
+
+  def generateConfigs() =
+    TestUtils.createBrokerConfigs(numServers, zkConnect, false, 
enableSSL=true, 
trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, 
overridingProps))
+
+  private var consumer1: SimpleConsumer = null
+  private var consumer2: SimpleConsumer = null
+
+  private val topic = "topic"
+  private val numRecords = 100
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    // TODO: we need to migrate to new consumers when 0.9 is final
+    consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 
1024*1024, "")
+    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 
1024*1024, "")
+
+  }
+
+  @After
+  override def tearDown() {
+    consumer1.close()
+    consumer2.close()
+    super.tearDown()
+  }
+
+  /**
+    * testSendOffset checks the basic send API behavior
+    *
+    * 1. Send with null key/value/partition-id should be accepted; send with 
null topic should be rejected.
+    * 2. Last message of the non-blocking send should return the correct 
offset metadata
+    */
+  @Test
+  def testSendOffset() {
+    var sslProducer = 
TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), 
enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    var producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers))
+    val partition = new Integer(0)
+
+    object callback extends Callback {
+      var offset = 0L
+      def onCompletion(metadata: RecordMetadata, exception: Exception) {
+        if (exception == null) {
+          assertEquals(offset, metadata.offset())
+          assertEquals(topic, metadata.topic())
+          assertEquals(partition, metadata.partition())
+          offset += 1
+        } else {
+          fail("Send callback returns the following exception", exception)
+        }
+      }
+    }
+
+    try {
+      // create topic
+      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+      // send a normal record
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 0", 0L, sslProducer.send(record0, 
callback).get.offset)
+
+      // send a record with null value should be ok
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, "key".getBytes, null)
+      assertEquals("Should have offset 1", 1L, sslProducer.send(record1, 
callback).get.offset)
+
+      // send a record with null key should be ok
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, "value".getBytes)
+      assertEquals("Should have offset 2", 2L, sslProducer.send(record2, 
callback).get.offset)
+
+      // send a record with null part id should be ok
+      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
+      assertEquals("Should have offset 3", 3L, sslProducer.send(record3, 
callback).get.offset)
+
+      // send a record with null topic should fail
+      try {
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, 
partition, "key".getBytes, "value".getBytes)
+        sslProducer.send(record4, callback)
+        fail("Should not allow sending a record without topic")
+      } catch {
+        case iae: IllegalArgumentException => // this is ok
+        case e: Throwable => fail("Only expecting IllegalArgumentException", e)
+      }
+
+      // non-blocking send a list of records with sslProducer
+      for (i <- 1 to numRecords)
+        sslProducer.send(record0, callback)
+      // check that all messages have been acked via offset
+      assertEquals("Should have offset " + numRecords + 4L, numRecords + 4L, 
sslProducer.send(record0, callback).get.offset)
+
+      //non-blocking send a list of records with plaintext producer
+      for (i <- 1 to numRecords)
+        producer.send(record0, callback)
+
+      // check that all messages have been acked via offset
+      assertEquals("Should have offset " + (numRecords * 2 + 5L), numRecords * 
2 + 5L, producer.send(record0, callback).get.offset)
+
+    } finally {
+      if (sslProducer != null) {
+        sslProducer.close()
+        sslProducer = null
+      }
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+
+    }
+  }
+
+  /**
+    * testClose checks the closing behavior
+    *
+    * After close() returns, all messages should be sent with correct returned 
offset metadata
+    */
+  @Test
+  def testClose() {
+    var producer = 
TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), 
enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    try {
+      // create topic
+      TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+
+      // non-blocking send a list of records
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, 
"key".getBytes, "value".getBytes)
+      for (i <- 1 to numRecords)
+        producer.send(record0)
+      val response0 = producer.send(record0)
+
+      // close the producer
+      producer.close()
+      producer = null
+
+      // check that all messages have been acked via offset,
+      // this also checks that messages with same key go to the same partition
+      assertTrue("The last message should be acked before producer is 
shutdown", response0.isDone)
+      assertEquals("Should have offset " + numRecords, numRecords.toLong, 
response0.get.offset)
+
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+  }
+
+  /**
+    * testSendToPartition checks the partitioning behavior
+    *
+    * The specified partition-id should be respected
+    */
+  @Test
+  def testSendToPartition() {
+    var producer = 
TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), 
enableSSL=true, trustStoreFile=Some(trustStoreFile))
+    try {
+      // create topic
+      val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+      val partition = 1
+
+      // make sure leaders exist
+      val leader1 = leaders(partition)
+      assertTrue("Leader for topic \"topic\" partition 1 should exist", 
leader1.isDefined)
+
+      val responses =
+        for (i <- 1 to numRecords)
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, 
partition, null, ("value" + i).getBytes))
+      val futures = responses.toList
+      futures.map(_.get)
+      for (future <- futures)
+        assertTrue("Request should have completed", future.isDone)
+
+      // make sure all of them end up in the same partition with increasing 
offset values
+      for ((future, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset.toLong, future.get.offset)
+        assertEquals(topic, future.get.topic)
+        assertEquals(partition, future.get.partition)
+      }
+
+      // make sure the fetched messages also respect the partitioning and 
ordering
+      val fetchResponse1 = if (leader1.get == configs(0).brokerId) {
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
+      } else {
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 
0, Int.MaxValue).build())
+      }
+      val messageSet1 = fetchResponse1.messageSet(topic, 
partition).iterator.toBuffer
+      assertEquals("Should have fetched " + numRecords + " messages", 
numRecords, messageSet1.size)
+
+      // TODO: also check topic and partition after they are added in the 
return messageSet
+      for (i <- 0 to numRecords - 1) {
+        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), 
messageSet1(i).message)
+        assertEquals(i.toLong, messageSet1(i).offset)
+      }
+    } finally {
+      if (producer != null) {
+        producer.close()
+        producer = null
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 05b9a87..ed94039 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -46,7 +46,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     configs = (0 until 4).map(i => 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, 
enableControlledShutdown = false)))
     // start all the servers
     servers = configs.map(c => TestUtils.createServer(c))
-    brokers = servers.map(s => new Broker(s.config.brokerId, 
s.config.hostName, s.boundPort))
+    brokers = servers.map(s => new Broker(s.config.brokerId, 
s.config.hostName, s.boundPort()))
 
     // create topics first
     createTopic(zkClient, topic1, partitionReplicaAssignment = 
Map(0->Seq(0,1)), servers = servers)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 4dba7dc..e4f3576 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -112,9 +112,9 @@ class UncleanLeaderElectionTest extends 
ZooKeeperTestHarness {
 
   @Test
   def testUncleanLeaderElectionDisabled {
-       // disable unclean leader election
-       configProps1.put("unclean.leader.election.enable", 
String.valueOf(false))
-       configProps2.put("unclean.leader.election.enable", 
String.valueOf(false))
+         // disable unclean leader election
+         configProps1.put("unclean.leader.election.enable", 
String.valueOf(false))
+       configProps2.put("unclean.leader.election.enable", 
String.valueOf(false))
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index d08b8b8..1937943 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -19,6 +19,8 @@ package kafka.network;
 
 import java.io._
 import java.net._
+import javax.net.ssl._
+import java.io._
 import java.nio.ByteBuffer
 import java.util.Random
 
@@ -33,24 +35,31 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.SystemTime
 import org.junit.Assert._
 import org.junit._
+import org.scalatest.junit.JUnitSuite
+import java.util.Random
+import kafka.producer.SyncProducerConfig
+import kafka.api.ProducerRequest
+import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import kafka.server.KafkaConfig
+import java.nio.channels.SelectionKey
+import kafka.utils.TestUtils
 
 import scala.collection.Map
 
-class SocketServerTest {
-
-  val server: SocketServer = new SocketServer(0,
-                                              Map(SecurityProtocol.PLAINTEXT 
-> EndPoint(null, 0, SecurityProtocol.PLAINTEXT),
-                                                  SecurityProtocol.TRACE -> 
EndPoint(null, 0, SecurityProtocol.TRACE)),
-                                              numProcessorThreads = 1,
-                                              maxQueuedRequests = 50,
-                                              sendBufferSize = 300000,
-                                              recvBufferSize = 300000,
-                                              maxRequestSize = 50,
-                                              maxConnectionsPerIp = 5,
-                                              connectionsMaxIdleMs = 60*1000,
-                                              maxConnectionsPerIpOverrides = 
Map.empty[String,Int],
-                                              new SystemTime(),
-                                              new Metrics())
+class SocketServerTest extends JUnitSuite {
+  val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 
0)
+  props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0")
+  props.put("num.network.threads", "1")
+  props.put("socket.send.buffer.bytes", "300000")
+  props.put("socket.receive.buffer.bytes", "300000")
+  props.put("queued.max.requests", "50")
+  props.put("socket.request.max.bytes", "50")
+  props.put("max.connections.per.ip", "5")
+  props.put("connections.max.idle.ms", "60000")
+  val config: KafkaConfig = KafkaConfig.fromProps(props)
+  val server: SocketServer = new SocketServer(config, new Metrics(), new 
SystemTime())
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -177,18 +186,8 @@ class SocketServerTest {
   def testMaxConnectionsPerIPOverrides() {
     val overrideNum = 6
     val overrides: Map[String, Int] = Map("localhost" -> overrideNum)
-    val overrideServer: SocketServer = new SocketServer(0,
-                                                Map(SecurityProtocol.PLAINTEXT 
-> EndPoint(null, 0, SecurityProtocol.PLAINTEXT)),
-                                                numProcessorThreads = 1,
-                                                maxQueuedRequests = 50,
-                                                sendBufferSize = 300000,
-                                                recvBufferSize = 300000,
-                                                maxRequestSize = 50,
-                                                maxConnectionsPerIp = 5,
-                                                connectionsMaxIdleMs = 60*1000,
-                                                maxConnectionsPerIpOverrides = 
overrides,
-                                                new SystemTime(),
-                                                new Metrics())
+    val overrideprops = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0)
+    val overrideServer: SocketServer = new 
SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new 
SystemTime())
     overrideServer.startup()
     // make the maximum allowable number of connections and then leak them
     val conns = ((0 until overrideNum).map(i => connect(overrideServer)))
@@ -198,4 +197,37 @@ class SocketServerTest {
     assertEquals(-1, conn.getInputStream.read())
     overrideServer.shutdown()
   }
+
+  @Test
+  def testSSLSocketServer(): Unit = {
+    val trustStoreFile = File.createTempFile("truststore", ".jks")
+    val overrideprops = TestUtils.createBrokerConfig(0, 
TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = 
Some(trustStoreFile))
+    overrideprops.put("listeners", "SSL://localhost:0")
+
+    val overrideServer: SocketServer = new 
SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new 
SystemTime())
+    overrideServer.startup()
+    val sslContext = SSLContext.getInstance("TLSv1.2")
+    sslContext.init(null, Array(TestUtils.trustAllCerts), new 
java.security.SecureRandom())
+    val socketFactory = sslContext.getSocketFactory
+    val sslSocket = socketFactory.createSocket("localhost", 
overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
+    sslSocket.setNeedClientAuth(false)
+
+    val correlationId = -1
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val emptyRequest =
+      new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, 
collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
+
+    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
+    emptyRequest.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val serializedBytes = new Array[Byte](byteBuffer.remaining)
+    byteBuffer.get(serializedBytes)
+
+    sendRequest(sslSocket, 0, serializedBytes)
+    processRequest(overrideServer.requestChannel)
+    assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
+    overrideServer.shutdown()
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 9688b8c..3da666f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeMinutesProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -47,7 +47,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -56,7 +56,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -64,7 +64,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeBothMinutesAndHoursProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -74,7 +74,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
-  
+
   @Test
   def testLogRetentionTimeBothMinutesAndMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -166,11 +166,11 @@ class KafkaConfigTest {
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
     val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
-    
+
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
   }
-    
+
   @Test
   def testAdvertisePortDefault() {
     val advertisedHostName = "routable-host"
@@ -187,7 +187,7 @@ class KafkaConfigTest {
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, port.toInt)
   }
-  
+
   @Test
   def testAdvertiseHostNameDefault() {
     val hostName = "routable-host"
@@ -203,8 +203,8 @@ class KafkaConfigTest {
 
     assertEquals(endpoint.host, hostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
-  }    
-    
+  }
+
   @Test
   def testDuplicateListeners() {
     val props = new Properties()
@@ -328,7 +328,7 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
     }
   }
-  
+
   @Test
   def testLogRollTimeMsProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -337,7 +337,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
   }
-  
+
   @Test
   def testLogRollTimeBothMsAndHoursProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -347,7 +347,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
   }
-    
+
   @Test
   def testLogRollTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
@@ -487,6 +487,22 @@ class KafkaConfigTest {
         case KafkaConfig.MetricSampleWindowMsProp => 
assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricReporterClassesProp => // ignore string
 
+        //SSL Configs
+        case KafkaConfig.PrincipalBuilderClassProp =>
+        case KafkaConfig.SSLProtocolProp => // ignore string
+        case KafkaConfig.SSLProviderProp => // ignore string
+        case KafkaConfig.SSLEnabledProtocolsProp =>
+        case KafkaConfig.SSLKeystoreTypeProp => // ignore string
+        case KafkaConfig.SSLKeystoreLocationProp => // ignore string
+        case KafkaConfig.SSLKeystorePasswordProp => // ignore string
+        case KafkaConfig.SSLKeyPasswordProp => // ignore string
+        case KafkaConfig.SSLTruststoreTypeProp => // ignore string
+        case KafkaConfig.SSLTruststorePasswordProp => // ignore string
+        case KafkaConfig.SSLTruststoreLocationProp => // ignore string
+        case KafkaConfig.SSLKeyManagerAlgorithmProp =>
+        case KafkaConfig.SSLTrustManagerAlgorithmProp =>
+        case KafkaConfig.SSLClientAuthProp => // ignore string
+
         case nonNegativeIntProperty => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f00f00a..00fbb61 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,6 +22,8 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
+import java.security.cert.X509Certificate
+import javax.net.ssl.X509TrustManager
 import charset.Charset
 
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -45,9 +47,16 @@ import kafka.log._
 
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.security.ssl.SSLFactory
+import org.apache.kafka.common.config.SSLConfigs
+import org.apache.kafka.test.TestSSLUtils
 
 import scala.collection.Map
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import scala.collection.JavaConversions._
 
 /**
  * Utility functions to help with testing
@@ -132,24 +141,33 @@ object TestUtils extends Logging {
   def createBrokerConfigs(numConfigs: Int,
     zkConnect: String,
     enableControlledShutdown: Boolean = true,
-    enableDeleteTopic: Boolean = false): Seq[Properties] = {
-    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, 
enableControlledShutdown, enableDeleteTopic))
+    enableDeleteTopic: Boolean = false,
+    enableSSL: Boolean = false,
+    trustStoreFile: Option[File] = None): Seq[Properties] = {
+    (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, 
enableControlledShutdown, enableDeleteTopic, enableSSL = enableSSL, 
trustStoreFile = trustStoreFile))
   }
 
   def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
     servers.map(s => formatAddress(s.config.hostName, 
s.boundPort())).mkString(",")
   }
 
+  def getSSLBrokerListStrFromServers(servers: Seq[KafkaServer]): String = {
+    servers.map(s => formatAddress(s.config.hostName, 
s.boundPort(SecurityProtocol.SSL))).mkString(",")
+  }
+
   /**
    * Create a test config for the given node id
    */
   def createBrokerConfig(nodeId: Int, zkConnect: String,
     enableControlledShutdown: Boolean = true,
     enableDeleteTopic: Boolean = false,
-    port: Int = RandomPort): Properties = {
+    port: Int = RandomPort, enableSSL: Boolean = false, sslPort: Int = 
RandomPort, trustStoreFile: Option[File] = None): Properties = {
     val props = new Properties
+    var listeners: String = "PLAINTEXT://localhost:"+port.toString
     if (nodeId >= 0) props.put("broker.id", nodeId.toString)
-    props.put("listeners", "PLAINTEXT://localhost:"+port.toString)
+    if (enableSSL)
+      listeners = listeners + "," + "SSL://localhost:"+sslPort.toString
+    props.put("listeners", listeners)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkConnect)
     props.put("replica.socket.timeout.ms", "1500")
@@ -157,6 +175,9 @@ object TestUtils extends Logging {
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
+    if (enableSSL) {
+      props.putAll(addSSLConfigs(SSLFactory.Mode.SERVER, true, trustStoreFile, 
"server"+nodeId))
+    }
     props.put("port", port.toString)
     props
   }
@@ -381,7 +402,9 @@ object TestUtils extends Logging {
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
                         retries: Int = 0,
-                        lingerMs: Long = 0) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
+                        lingerMs: Long = 0,
+                        enableSSL: Boolean = false,
+                        trustStoreFile: Option[File] = None) : 
KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -396,6 +419,10 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
+    if (enableSSL) {
+      producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
+      producerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, 
trustStoreFile, "producer"))
+    }
     new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
@@ -405,7 +432,12 @@ object TestUtils extends Logging {
   def createNewConsumer(brokerList: String,
                         groupId: String,
                         autoOffsetReset: String = "earliest",
-                        partitionFetchSize: Long = 4096L) : 
KafkaConsumer[Array[Byte],Array[Byte]] = {
+                        partitionFetchSize: Long = 4096L,
+                        partitionAssignmentStrategy: String = "blah",
+                        sessionTimeout: Int = 30000,
+                        callback: Option[ConsumerRebalanceCallback] = None,
+                        enableSSL: Boolean = false,
+                        trustStoreFile: Option[File] = None) : 
KafkaConsumer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
     val consumerProps= new Properties()
@@ -417,7 +449,17 @@ object TestUtils extends Logging {
     consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+    consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
partitionAssignmentStrategy)
+    consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
sessionTimeout.toString)
+    if (enableSSL) {
+      consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
+      consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, 
trustStoreFile, "consumer"))
+    }
+    if (callback.isDefined) {
+      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps, callback.get, 
new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    } else {
+      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+    }
   }
 
   /**
@@ -873,6 +915,37 @@ object TestUtils extends Logging {
     new String(bytes, encoding)
   }
 
+  def addSSLConfigs(mode: SSLFactory.Mode, clientCert: Boolean, 
trustStoreFile: Option[File],  certAlias: String): Properties = {
+    var sslConfigs: java.util.Map[String, Object] = new 
java.util.HashMap[String, Object]()
+    if (!trustStoreFile.isDefined) {
+      throw new Exception("enableSSL set to true but no trustStoreFile 
provided")
+    }
+    if (mode == SSLFactory.Mode.SERVER)
+      sslConfigs = TestSSLUtils.createSSLConfig(true, true, mode, 
trustStoreFile.get, certAlias)
+    else
+      sslConfigs = TestSSLUtils.createSSLConfig(clientCert, false, mode, 
trustStoreFile.get, certAlias)
+
+    val sslProps = new Properties()
+    sslConfigs.foreach(kv =>
+      sslProps.put(kv._1, kv._2)
+    )
+    sslProps
+  }
+
+  // a X509TrustManager to trust self-signed certs for unit tests.
+  def trustAllCerts: X509TrustManager = {
+    val trustManager = new X509TrustManager() {
+      override def getAcceptedIssuers: Array[X509Certificate] = {
+        null
+      }
+      override def checkClientTrusted(certs: Array[X509Certificate], authType: 
String) {
+      }
+      override def checkServerTrusted(certs: Array[X509Certificate], authType: 
String) {
+      }
+    }
+    trustManager
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

Reply via email to