Repository: kafka Updated Branches: refs/heads/trunk 503bd3664 -> 9e2c683f5
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c39402c..d547a01 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package kafka.server @@ -26,14 +26,17 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SSLConfigs import org.apache.kafka.common.config.ConfigDef.Importance._ import org.apache.kafka.common.config.ConfigDef.Range._ import org.apache.kafka.common.config.ConfigDef.Type._ + import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.PrincipalBuilder +import scala.collection.{mutable, immutable, JavaConversions, Map} -import scala.collection.{Map, immutable} object Defaults { /** ********* Zookeeper Configuration ***********/ @@ -151,6 +154,25 @@ object Defaults { val MetricNumSamples = 2 val MetricSampleWindowMs = 30000 val MetricReporterClasses = "" + + /** ********* SSL configuration ***********/ + val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS + val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL + val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS + val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE + val SSLKeystoreLocation = "/tmp/ssl.keystore.jks" + val SSLKeystorePassword = "keystore_password" + val SSLKeyPassword = "key_password" + val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + val SSLTruststoreLocation = SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION + val SSLTruststorePassword = SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD + val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM + val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM + val SSLClientAuthRequired = "required" + val SSLClientAuthRequested = "requested" + val SSLClientAuthNone = "none" + val SSLClientAuth = SSLClientAuthNone + } object KafkaConfig { @@ -278,6 +300,25 @@ object KafkaConfig { val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG + /** ********* SSL Configuration ****************/ + val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG + val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG + val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG + val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG + val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG + val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG + val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG + val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG + val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG + val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG + val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG + val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG + val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG + val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG + + /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" @@ -287,8 +328,8 @@ object KafkaConfig { /** ********* General Configuration ***********/ val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id" val BrokerIdDoc = "The broker id for this server. " + - "To avoid conflicts between zookeeper generated brokerId and user's config.brokerId " + - "added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1." + "To avoid conflicts between zookeeper generated brokerId and user's config.brokerId " + + "added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1." val MessageMaxBytesDoc = "The maximum size of message that the server can receive" val NumNetworkThreadsDoc = "the number of network threads that the server uses for handling network requests" val NumIoThreadsDoc = "The number of io threads that the server uses for carrying out network requests" @@ -298,21 +339,21 @@ object KafkaConfig { val PortDoc = "the port to listen and accept connections on" val HostNameDoc = "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces" val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and their protocols.\n" + - " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + - " Leave hostname empty to bind to default interface.\n" + - " Examples of legal listener lists:\n" + - " PLAINTEXT://myhost:9092,TRACE://:9091\n" + - " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n" + " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + + " Leave hostname empty to bind to default interface.\n" + + " Examples of legal listener lists:\n" + + " PLAINTEXT://myhost:9092,TRACE://:9091\n" + + " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n" val AdvertisedHostNameDoc = "Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " + - "need to be different from the interface to which the broker binds. If this is not set, " + - "it will use the value for \"host.name\" if configured. Otherwise " + - "it will use the value returned from java.net.InetAddress.getCanonicalHostName()." + "need to be different from the interface to which the broker binds. If this is not set, " + + "it will use the value for \"host.name\" if configured. Otherwise " + + "it will use the value returned from java.net.InetAddress.getCanonicalHostName()." val AdvertisedPortDoc = "The port to publish to ZooKeeper for clients to use. In IaaS environments, this may " + - "need to be different from the port to which the broker binds. If this is not set, " + - "it will publish the same port that the broker binds to." + "need to be different from the port to which the broker binds. If this is not set, " + + "it will publish the same port that the broker binds to." val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + - " In IaaS environments, this may need to be different from the interface to which the broker binds." + - " If this is not set, the value for \"listeners\" will be used." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + + " If this is not set, the value for \"listeners\" will be used." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets" val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets" val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" @@ -342,7 +383,7 @@ object KafkaConfig { val LogCleanerDedupeBufferSizeDoc = "The total memory used for log deduplication across all cleaner threads" val LogCleanerIoBufferSizeDoc = "The total memory used for log cleaner I/O buffers across all cleaner threads" val LogCleanerDedupeBufferLoadFactorDoc = "Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value " + - "will allow more log to be cleaned at once but will lead to more hash collisions" + "will allow more log to be cleaned at once but will lead to more hash collisions" val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean" val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning" val LogCleanerEnableDoc = "Should we enable log cleaning?" @@ -363,15 +404,15 @@ object KafkaConfig { val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels" val DefaultReplicationFactorDoc = "default replication factors for automatically created topics" val ReplicaLagTimeMaxMsDoc = "If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time," + - " the leader will remove the follower from isr" + " the leader will remove the follower from isr" val ReplicaSocketTimeoutMsDoc = "The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms" val ReplicaSocketReceiveBufferBytesDoc = "The socket receive buffer for network requests" val ReplicaFetchMaxBytesDoc = "The number of byes of messages to attempt to fetch" val ReplicaFetchWaitMaxMsDoc = "max wait time for each fetcher request issued by follower replicas. This value should always be less than the " + - "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics" + "replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics" val ReplicaFetchMinBytesDoc = "Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs" val NumReplicaFetchersDoc = "Number of fetcher threads used to replicate messages from a source broker. " + - "Increasing this value can increase the degree of I/O parallelism in the follower broker." + "Increasing this value can increase the degree of I/O parallelism in the follower broker." val ReplicaFetchBackoffMsDoc = "The amount of time to sleep when fetch partition error occurs." val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk" val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory" @@ -382,8 +423,8 @@ object KafkaConfig { val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Defaults to plain text." val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + - " This is typically bumped after all brokers were upgraded to a new version.\n" + - " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list." + " This is typically bumped after all brokers were upgraded to a new version.\n" + + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.8.3, 0.8.3.0. Check ApiVersion for the full list." /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." @@ -395,17 +436,17 @@ object KafkaConfig { val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache." val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set higher to ensure availability). " + - "To ensure that the effective replication factor of the offsets topic is the configured value, " + - "the number of alive brokers has to be at least the replication factor at the time of the " + - "first request for the offsets topic. If not, either the offsets topic creation will fail or " + - "it will get a replication factor of min(alive brokers, configured replication factor)" + "To ensure that the effective replication factor of the offsets topic is the configured value, " + + "the number of alive brokers has to be at least the replication factor at the time of the " + + "first request for the offsets topic. If not, either the offsets topic creation will fail or " + + "it will get a replication factor of min(alive brokers, configured replication factor)" val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)" val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads" val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits" val OffsetsRetentionMinutesDoc = "Log retention window in minutes for offsets topic" val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for stale offsets" val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + - "or this timeout is reached. This is similar to the producer request timeout." + "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" @@ -419,14 +460,31 @@ object KafkaConfig { val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + - "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + - "'producer' which means retain the original compression codec set by the producer." + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + + "'producer' which means retain the original compression codec set by the producer." /** ********* Kafka Metrics Configuration ***********/ val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC + /** ********* SSL Configuration ****************/ + val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC + val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC + val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC + val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC + val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC + val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC + val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC + val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC + val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC + val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC + val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC + val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC + val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC + val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC + val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC + val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC private val configDef = { import ConfigDef.Range._ @@ -561,6 +619,24 @@ object KafkaConfig { .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) + + + /** ********* SSL Configuration ****************/ + .define(PrincipalBuilderClassProp, STRING, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc) + .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc) + .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false) + .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc) + .define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, SSLKeystoreTypeDoc) + .define(SSLKeystoreLocationProp, STRING, Defaults.SSLKeystoreLocation, MEDIUM, SSLKeystoreLocationDoc) + .define(SSLKeystorePasswordProp, STRING, Defaults.SSLKeystorePassword, MEDIUM, SSLKeystorePasswordDoc) + .define(SSLKeyPasswordProp, STRING, Defaults.SSLKeyPassword, MEDIUM, SSLKeyPasswordDoc) + .define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, MEDIUM, SSLTruststoreTypeDoc) + .define(SSLTruststoreLocationProp, STRING, Defaults.SSLTruststoreLocation, MEDIUM, SSLTruststoreLocationDoc) + .define(SSLTruststorePasswordProp, STRING, Defaults.SSLTruststorePassword, MEDIUM, SSLTruststorePasswordDoc) + .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc) + .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc) + .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc) + } def configNames() = { @@ -569,8 +645,8 @@ object KafkaConfig { } /** - * Check that property names are valid - */ + * Check that property names are valid + */ def validateNames(props: Properties) { import scala.collection.JavaConversions._ val names = configDef.names() @@ -622,7 +698,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)} val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp) - /** ********* Log Configuration ***********/ val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) val numPartitions = getInt(KafkaConfig.NumPartitionsProp) @@ -701,6 +776,22 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) + /** ********* SSL Configuration **************/ + val principalBuilderClass = getString(KafkaConfig.PrincipalBuilderClassProp) + val sslProtocol = getString(KafkaConfig.SSLProtocolProp) + val sslProvider = getString(KafkaConfig.SSLProviderProp) + val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp) + val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp) + val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp) + val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp) + val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp) + val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp) + val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp) + val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp) + val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp) + val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp) + val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp) + /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) @@ -726,8 +817,8 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour }) - if (millis < 0) return -1 - millis + if (millis < 0) return -1 + millis } private def getMap(propName: String, propValue: String): Map[String, String] = { @@ -746,10 +837,12 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka } catch { case e: Exception => throw new IllegalArgumentException("Error creating broker listeners from '%s': %s".format(listeners, e.getMessage)) } - val distinctPorts = endpoints.map(ep => ep.port).distinct + // filter port 0 for unit tests + val endpointsWithoutZeroPort = endpoints.map(ep => ep.port).filter(_ != 0) + val distinctPorts = endpointsWithoutZeroPort.distinct val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct - require(distinctPorts.size == endpoints.size, "Each listener must have a different port") + require(distinctPorts.size == endpointsWithoutZeroPort.size, "Each listener must have a different port") require(distinctProtocols.size == endpoints.size, "Each listener must have a different protocol") } @@ -796,6 +889,11 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka } + + private def getPrincipalBuilderClass(principalBuilderClass: String): PrincipalBuilder = { + CoreUtils.createObject[PrincipalBuilder](principalBuilderClass) + } + validateValues() private def validateValues() { @@ -815,4 +913,24 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) } + + def channelConfigs: java.util.Map[String, Object] = { + val channelConfigs = new java.util.HashMap[String, Object]() + import kafka.server.KafkaConfig._ + channelConfigs.put(PrincipalBuilderClassProp, Class.forName(principalBuilderClass)) + channelConfigs.put(SSLProtocolProp, sslProtocol) + channelConfigs.put(SSLEnabledProtocolsProp, sslEnabledProtocols) + channelConfigs.put(SSLKeystoreTypeProp, sslKeystoreType) + channelConfigs.put(SSLKeystoreLocationProp, sslKeystoreLocation) + channelConfigs.put(SSLKeystorePasswordProp, sslKeystorePassword) + channelConfigs.put(SSLKeyPasswordProp, sslKeyPassword) + channelConfigs.put(SSLTruststoreTypeProp, sslTruststoreType) + channelConfigs.put(SSLTruststoreLocationProp, sslTruststoreLocation) + channelConfigs.put(SSLTruststorePasswordProp, sslTruststorePassword) + channelConfigs.put(SSLKeyManagerAlgorithmProp, sslKeyManagerAlgorithm) + channelConfigs.put(SSLTrustManagerAlgorithmProp, sslTrustManagerAlgorithm) + channelConfigs.put(SSLClientAuthProp, sslClientAuth) + channelConfigs + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a0e3fdf..0e7ba3e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -30,6 +30,7 @@ import java.io.File import kafka.utils._ import org.apache.kafka.common.metrics._ import org.apache.kafka.common.network.NetworkReceive +import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import scala.collection.mutable @@ -84,9 +85,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private var shutdownLatch = new CountDownLatch(1) - private val metricConfig: MetricConfig = new MetricConfig() - .samples(config.metricNumSamples) - .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS) private val jmxPrefix: String = "kafka.server" private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses reporters.add(new JmxReporter(jmxPrefix)) @@ -97,6 +95,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() var metrics: Metrics = null + private val metricConfig: MetricConfig = new MetricConfig() + .samples(config.metricNumSamples) + .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS) + val brokerState: BrokerState = new BrokerState var apis: KafkaApis = null @@ -119,7 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var kafkaHealthcheck: KafkaHealthcheck = null val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" @@ -166,63 +167,52 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " - socketServer = new SocketServer(config.brokerId, - config.listeners, - config.numNetworkThreads, - config.queuedMaxRequests, - config.socketSendBufferBytes, - config.socketReceiveBufferBytes, - config.socketRequestMaxBytes, - config.maxConnectionsPerIp, - config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides, - kafkaMetricsTime, - metrics) - socketServer.startup() - - /* start replica manager */ - replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) - replicaManager.startup() - - /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, brokerState) - kafkaController.startup() - - /* start kafka coordinator */ - consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler) - consumerCoordinator.startup() - - /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache, metrics) - requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) - brokerState.newState(RunningAsBroker) - - Mx4jLoader.maybeLoad() - - /* start dynamic config manager */ - dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), - ConfigType.Client -> new ClientIdConfigHandler) - dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) - dynamicConfigManager.startup() - - /* tell everyone we are alive */ - val listeners = config.advertisedListeners.map {case(protocol, endpoint) => - if (endpoint.port == 0) - (protocol, EndPoint(endpoint.host, socketServer.boundPort(), endpoint.protocolType)) - else - (protocol, endpoint) - } - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient) - kafkaHealthcheck.startup() + socketServer = new SocketServer(config, metrics, kafkaMetricsTime) + socketServer.startup() + + /* start replica manager */ + replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager.startup() + + /* start kafka controller */ + kafkaController = new KafkaController(config, zkClient, brokerState) + kafkaController.startup() + + /* start kafka coordinator */ + consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler) + consumerCoordinator.startup() + + /* start processing requests */ + apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache, metrics) + requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) + brokerState.newState(RunningAsBroker) + + Mx4jLoader.maybeLoad() + + /* start dynamic config manager */ + dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), + ConfigType.Client -> new ClientIdConfigHandler) + dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) + dynamicConfigManager.startup() + + /* tell everyone we are alive */ + val listeners = config.advertisedListeners.map {case(protocol, endpoint) => + if (endpoint.port == 0) + (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType)) + else + (protocol, endpoint) + } + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck.startup() - /* register broker metrics */ - registerStats() + /* register broker metrics */ + registerStats() - shutdownLatch = new CountDownLatch(1) - startupComplete.set(true) - isStartingUp.set(false) - info("started") + shutdownLatch = new CountDownLatch(1) + startupComplete.set(true) + isStartingUp.set(false) + info("started") } } catch { @@ -414,7 +404,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager - def boundPort(): Int = socketServer.boundPort() + def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = socketServer.boundPort(protocol) private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 4b6358c..637d6f3 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -74,7 +74,7 @@ class ProducerSendTest extends KafkaServerTestHarness { def testSendOffset() { var producer = TestUtils.createNewProducer(brokerList) val partition = new Integer(0) - + object callback extends Callback { var offset = 0L def onCompletion(metadata: RecordMetadata, exception: Exception) { @@ -298,7 +298,7 @@ class ProducerSendTest extends KafkaServerTestHarness { } } } - + /** * Test that flush immediately sends all accumulated requests. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala new file mode 100644 index 0000000..4281036 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.util.Properties +import java.io.File + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.CommitType +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException +import kafka.integration.KafkaServerTestHarness + +import kafka.utils.{TestUtils, Logging} +import kafka.server.KafkaConfig + +import java.util.ArrayList +import org.junit.{Test, Before, After} +import org.junit.Assert._ + +import scala.collection.mutable.Buffer +import scala.collection.JavaConversions._ +import kafka.coordinator.ConsumerCoordinator + + +/** + * Integration tests for the new consumer that cover basic usage as well as server failures + */ +class SSLConsumerTest extends KafkaServerTestHarness with Logging { + + val trustStoreFile = File.createTempFile("truststore", ".jks") + val numServers = 3 + val producerCount = 1 + val consumerCount = 2 + val producerConfig = new Properties + val consumerConfig = new Properties + + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) + overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + overridingProps.put(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout + + val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + + def generateConfigs() = + TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps)) + + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + + // configure the servers and clients + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") + this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + @Before + override def setUp() { + super.setUp() + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers)) + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers)) + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") + + for (i <- 0 until producerCount) + producers += TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), + acks = 1, + enableSSL=true, + trustStoreFile=Some(trustStoreFile)) + for (i <- 0 until consumerCount) + consumers += TestUtils.createNewConsumer(TestUtils.getSSLBrokerListStrFromServers(servers), + groupId = "my-test", + partitionAssignmentStrategy= "range", + enableSSL=true, + trustStoreFile=Some(trustStoreFile)) + + + // create the consumer offset topic + TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName, + overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, + overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, + servers, + servers(0).consumerCoordinator.offsetsTopicConfigs) + + // create the test topic with all the brokers as replicas + TestUtils.createTopic(this.zkClient, topic, 1, numServers, this.servers) + } + + @After + override def tearDown() { + producers.foreach(_.close()) + consumers.foreach(_.close()) + super.tearDown() + } + + @Test + def testSimpleConsumption() { + val numRecords = 10000 + sendRecords(numRecords) + assertEquals(0, this.consumers(0).subscriptions.size) + this.consumers(0).subscribe(tp) + assertEquals(1, this.consumers(0).subscriptions.size) + this.consumers(0).seek(tp, 0) + consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) + } + + @Test + def testAutoOffsetReset() { + sendRecords(1) + this.consumers(0).subscribe(tp) + consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) + } + + @Test + def testSeek() { + val consumer = this.consumers(0) + val totalRecords = 50L + sendRecords(totalRecords.toInt) + consumer.subscribe(tp) + + consumer.seekToEnd(tp) + assertEquals(totalRecords, consumer.position(tp)) + assertFalse(consumer.poll(totalRecords).iterator().hasNext) + + consumer.seekToBeginning(tp) + assertEquals(0, consumer.position(tp), 0) + consumeRecords(consumer, numRecords = 1, startingOffset = 0) + + val mid = totalRecords / 2 + consumer.seek(tp, mid) + assertEquals(mid, consumer.position(tp)) + consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) + } + + @Test + def testGroupConsumption() { + sendRecords(10) + this.consumers(0).subscribe(topic) + consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) + } + + @Test + def testPositionAndCommit() { + sendRecords(5) + + // committed() on a partition with no committed offset throws an exception + intercept[NoOffsetForPartitionException] { + this.consumers(0).committed(new TopicPartition(topic, 15)) + } + + // position() on a partition that we aren't subscribed to throws an exception + intercept[IllegalArgumentException] { + this.consumers(0).position(new TopicPartition(topic, 15)) + } + + this.consumers(0).subscribe(tp) + + assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp)) + this.consumers(0).commit(CommitType.SYNC) + assertEquals(0L, this.consumers(0).committed(tp)) + + consumeRecords(this.consumers(0), 5, 0) + assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp)) + this.consumers(0).commit(CommitType.SYNC) + assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp)) + + sendRecords(1) + + // another consumer in the same group should get the same position + this.consumers(1).subscribe(tp) + consumeRecords(this.consumers(1), 1, 5) + } + + @Test + def testPartitionsFor() { + val numParts = 2 + TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers) + val parts = this.consumers(0).partitionsFor("part-test") + assertNotNull(parts) + assertEquals(2, parts.length) + assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + + private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback { + var callsToAssigned = 0 + var callsToRevoked = 0 + def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { + info("onPartitionsAssigned called.") + callsToAssigned += 1 + } + def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) { + info("onPartitionsRevoked called.") + callsToRevoked += 1 + } + } + + private def sendRecords(numRecords: Int) { + val futures = (0 until numRecords).map { i => + this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + } + futures.map(_.get) + } + + private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) { + val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() + val maxIters = numRecords * 300 + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50)) { + records.add(record) + } + if (iters > maxIters) + throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + for (i <- 0 until numRecords) { + val record = records.get(i) + val offset = startingOffset + i + assertEquals(topic, record.topic()) + assertEquals(part, record.partition()) + assertEquals(offset.toLong, record.offset()) + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala new file mode 100644 index 0000000..0f70624 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.io.File + +import kafka.consumer.SimpleConsumer +import kafka.integration.KafkaServerTestHarness +import kafka.message.Message +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.junit.Assert._ +import org.junit.{After, Before, Test} + + +class SSLProducerSendTest extends KafkaServerTestHarness { + val numServers = 2 + val trustStoreFile = File.createTempFile("truststore", ".jks") + + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) + + def generateConfigs() = + TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps)) + + private var consumer1: SimpleConsumer = null + private var consumer2: SimpleConsumer = null + + private val topic = "topic" + private val numRecords = 100 + + @Before + override def setUp() { + super.setUp() + + // TODO: we need to migrate to new consumers when 0.9 is final + consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") + + } + + @After + override def tearDown() { + consumer1.close() + consumer2.close() + super.tearDown() + } + + /** + * testSendOffset checks the basic send API behavior + * + * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. + * 2. Last message of the non-blocking send should return the correct offset metadata + */ + @Test + def testSendOffset() { + var sslProducer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers)) + val partition = new Integer(0) + + object callback extends Callback { + var offset = 0L + def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception == null) { + assertEquals(offset, metadata.offset()) + assertEquals(topic, metadata.topic()) + assertEquals(partition, metadata.partition()) + offset += 1 + } else { + fail("Send callback returns the following exception", exception) + } + } + } + + try { + // create topic + TestUtils.createTopic(zkClient, topic, 1, 2, servers) + + // send a normal record + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 0", 0L, sslProducer.send(record0, callback).get.offset) + + // send a record with null value should be ok + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) + assertEquals("Should have offset 1", 1L, sslProducer.send(record1, callback).get.offset) + + // send a record with null key should be ok + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) + assertEquals("Should have offset 2", 2L, sslProducer.send(record2, callback).get.offset) + + // send a record with null part id should be ok + val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) + assertEquals("Should have offset 3", 3L, sslProducer.send(record3, callback).get.offset) + + // send a record with null topic should fail + try { + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) + sslProducer.send(record4, callback) + fail("Should not allow sending a record without topic") + } catch { + case iae: IllegalArgumentException => // this is ok + case e: Throwable => fail("Only expecting IllegalArgumentException", e) + } + + // non-blocking send a list of records with sslProducer + for (i <- 1 to numRecords) + sslProducer.send(record0, callback) + // check that all messages have been acked via offset + assertEquals("Should have offset " + numRecords + 4L, numRecords + 4L, sslProducer.send(record0, callback).get.offset) + + //non-blocking send a list of records with plaintext producer + for (i <- 1 to numRecords) + producer.send(record0, callback) + + // check that all messages have been acked via offset + assertEquals("Should have offset " + (numRecords * 2 + 5L), numRecords * 2 + 5L, producer.send(record0, callback).get.offset) + + } finally { + if (sslProducer != null) { + sslProducer.close() + sslProducer = null + } + if (producer != null) { + producer.close() + producer = null + } + + } + } + + /** + * testClose checks the closing behavior + * + * After close() returns, all messages should be sent with correct returned offset metadata + */ + @Test + def testClose() { + var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) + try { + // create topic + TestUtils.createTopic(zkClient, topic, 1, 2, servers) + + // non-blocking send a list of records + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) + for (i <- 1 to numRecords) + producer.send(record0) + val response0 = producer.send(record0) + + // close the producer + producer.close() + producer = null + + // check that all messages have been acked via offset, + // this also checks that messages with same key go to the same partition + assertTrue("The last message should be acked before producer is shutdown", response0.isDone) + assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) + + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } + + /** + * testSendToPartition checks the partitioning behavior + * + * The specified partition-id should be respected + */ + @Test + def testSendToPartition() { + var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) + try { + // create topic + val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers) + val partition = 1 + + // make sure leaders exist + val leader1 = leaders(partition) + assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) + + val responses = + for (i <- 1 to numRecords) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) + val futures = responses.toList + futures.map(_.get) + for (future <- futures) + assertTrue("Request should have completed", future.isDone) + + // make sure all of them end up in the same partition with increasing offset values + for ((future, offset) <- futures zip (0 until numRecords)) { + assertEquals(offset.toLong, future.get.offset) + assertEquals(topic, future.get.topic) + assertEquals(partition, future.get.partition) + } + + // make sure the fetched messages also respect the partitioning and ordering + val fetchResponse1 = if (leader1.get == configs(0).brokerId) { + consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } else { + consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) + } + val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer + assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) + + // TODO: also check topic and partition after they are added in the return messageSet + for (i <- 0 to numRecords - 1) { + assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) + assertEquals(i.toLong, messageSet1(i).offset) + } + } finally { + if (producer != null) { + producer.close() + producer = null + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 05b9a87..ed94039 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -46,7 +46,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false))) // start all the servers servers = configs.map(c => TestUtils.createServer(c)) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort)) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort())) // create topics first createTopic(zkClient, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 4dba7dc..e4f3576 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -112,9 +112,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @Test def testUncleanLeaderElectionDisabled { - // disable unclean leader election - configProps1.put("unclean.leader.election.enable", String.valueOf(false)) - configProps2.put("unclean.leader.election.enable", String.valueOf(false)) + // disable unclean leader election + configProps1.put("unclean.leader.election.enable", String.valueOf(false)) + configProps2.put("unclean.leader.election.enable", String.valueOf(false)) startBrokers(Seq(configProps1, configProps2)) // create topic with 1 partition, 2 replicas, one on each broker http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index d08b8b8..1937943 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -19,6 +19,8 @@ package kafka.network; import java.io._ import java.net._ +import javax.net.ssl._ +import java.io._ import java.nio.ByteBuffer import java.util.Random @@ -33,24 +35,31 @@ import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.SystemTime import org.junit.Assert._ import org.junit._ +import org.scalatest.junit.JUnitSuite +import java.util.Random +import kafka.producer.SyncProducerConfig +import kafka.api.ProducerRequest +import java.nio.ByteBuffer +import kafka.common.TopicAndPartition +import kafka.message.ByteBufferMessageSet +import kafka.server.KafkaConfig +import java.nio.channels.SelectionKey +import kafka.utils.TestUtils import scala.collection.Map -class SocketServerTest { - - val server: SocketServer = new SocketServer(0, - Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT), - SecurityProtocol.TRACE -> EndPoint(null, 0, SecurityProtocol.TRACE)), - numProcessorThreads = 1, - maxQueuedRequests = 50, - sendBufferSize = 300000, - recvBufferSize = 300000, - maxRequestSize = 50, - maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = Map.empty[String,Int], - new SystemTime(), - new Metrics()) +class SocketServerTest extends JUnitSuite { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0") + props.put("num.network.threads", "1") + props.put("socket.send.buffer.bytes", "300000") + props.put("socket.receive.buffer.bytes", "300000") + props.put("queued.max.requests", "50") + props.put("socket.request.max.bytes", "50") + props.put("max.connections.per.ip", "5") + props.put("connections.max.idle.ms", "60000") + val config: KafkaConfig = KafkaConfig.fromProps(props) + val server: SocketServer = new SocketServer(config, new Metrics(), new SystemTime()) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -177,18 +186,8 @@ class SocketServerTest { def testMaxConnectionsPerIPOverrides() { val overrideNum = 6 val overrides: Map[String, Int] = Map("localhost" -> overrideNum) - val overrideServer: SocketServer = new SocketServer(0, - Map(SecurityProtocol.PLAINTEXT -> EndPoint(null, 0, SecurityProtocol.PLAINTEXT)), - numProcessorThreads = 1, - maxQueuedRequests = 50, - sendBufferSize = 300000, - recvBufferSize = 300000, - maxRequestSize = 50, - maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = overrides, - new SystemTime(), - new Metrics()) + val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime()) overrideServer.startup() // make the maximum allowable number of connections and then leak them val conns = ((0 until overrideNum).map(i => connect(overrideServer))) @@ -198,4 +197,37 @@ class SocketServerTest { assertEquals(-1, conn.getInputStream.read()) overrideServer.shutdown() } + + @Test + def testSSLSocketServer(): Unit = { + val trustStoreFile = File.createTempFile("truststore", ".jks") + val overrideprops = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = Some(trustStoreFile)) + overrideprops.put("listeners", "SSL://localhost:0") + + val overrideServer: SocketServer = new SocketServer(KafkaConfig.fromProps(overrideprops), new Metrics(), new SystemTime()) + overrideServer.startup() + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom()) + val socketFactory = sslContext.getSocketFactory + val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket] + sslSocket.setNeedClientAuth(false) + + val correlationId = -1 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack = SyncProducerConfig.DefaultRequiredAcks + val emptyRequest = + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(sslSocket, 0, serializedBytes) + processRequest(overrideServer.requestChannel) + assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq) + overrideServer.shutdown() + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 9688b8c..3da666f 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -38,7 +38,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis) } - + @Test def testLogRetentionTimeMinutesProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -47,7 +47,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - + @Test def testLogRetentionTimeMsProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -56,7 +56,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - + @Test def testLogRetentionTimeNoConfigProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -64,7 +64,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis) } - + @Test def testLogRetentionTimeBothMinutesAndHoursProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -74,7 +74,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - + @Test def testLogRetentionTimeBothMinutesAndMsProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -166,11 +166,11 @@ class KafkaConfigTest { val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.advertisedListeners val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get - + assertEquals(endpoint.host, advertisedHostName) assertEquals(endpoint.port, advertisedPort.toInt) } - + @Test def testAdvertisePortDefault() { val advertisedHostName = "routable-host" @@ -187,7 +187,7 @@ class KafkaConfigTest { assertEquals(endpoint.host, advertisedHostName) assertEquals(endpoint.port, port.toInt) } - + @Test def testAdvertiseHostNameDefault() { val hostName = "routable-host" @@ -203,8 +203,8 @@ class KafkaConfigTest { assertEquals(endpoint.host, hostName) assertEquals(endpoint.port, advertisedPort.toInt) - } - + } + @Test def testDuplicateListeners() { val props = new Properties() @@ -328,7 +328,7 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) } } - + @Test def testLogRollTimeMsProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -337,7 +337,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis) } - + @Test def testLogRollTimeBothMsAndHoursProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -347,7 +347,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis) } - + @Test def testLogRollTimeNoConfigProvided() { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) @@ -487,6 +487,22 @@ class KafkaConfigTest { case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") case KafkaConfig.MetricReporterClassesProp => // ignore string + //SSL Configs + case KafkaConfig.PrincipalBuilderClassProp => + case KafkaConfig.SSLProtocolProp => // ignore string + case KafkaConfig.SSLProviderProp => // ignore string + case KafkaConfig.SSLEnabledProtocolsProp => + case KafkaConfig.SSLKeystoreTypeProp => // ignore string + case KafkaConfig.SSLKeystoreLocationProp => // ignore string + case KafkaConfig.SSLKeystorePasswordProp => // ignore string + case KafkaConfig.SSLKeyPasswordProp => // ignore string + case KafkaConfig.SSLTruststoreTypeProp => // ignore string + case KafkaConfig.SSLTruststorePasswordProp => // ignore string + case KafkaConfig.SSLTruststoreLocationProp => // ignore string + case KafkaConfig.SSLKeyManagerAlgorithmProp => + case KafkaConfig.SSLTrustManagerAlgorithmProp => + case KafkaConfig.SSLClientAuthProp => // ignore string + case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") } }) http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f00f00a..00fbb61 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -22,6 +22,8 @@ import java.nio._ import java.nio.channels._ import java.util.Random import java.util.Properties +import java.security.cert.X509Certificate +import javax.net.ssl.X509TrustManager import charset.Charset import org.apache.kafka.common.protocol.SecurityProtocol @@ -45,9 +47,16 @@ import kafka.log._ import org.junit.Assert._ import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.security.ssl.SSLFactory +import org.apache.kafka.common.config.SSLConfigs +import org.apache.kafka.test.TestSSLUtils import scala.collection.Map -import org.apache.kafka.clients.consumer.KafkaConsumer +import scala.collection.JavaConversions._ /** * Utility functions to help with testing @@ -132,24 +141,33 @@ object TestUtils extends Logging { def createBrokerConfigs(numConfigs: Int, zkConnect: String, enableControlledShutdown: Boolean = true, - enableDeleteTopic: Boolean = false): Seq[Properties] = { - (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic)) + enableDeleteTopic: Boolean = false, + enableSSL: Boolean = false, + trustStoreFile: Option[File] = None): Seq[Properties] = { + (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, enableSSL = enableSSL, trustStoreFile = trustStoreFile)) } def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = { servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",") } + def getSSLBrokerListStrFromServers(servers: Seq[KafkaServer]): String = { + servers.map(s => formatAddress(s.config.hostName, s.boundPort(SecurityProtocol.SSL))).mkString(",") + } + /** * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, zkConnect: String, enableControlledShutdown: Boolean = true, enableDeleteTopic: Boolean = false, - port: Int = RandomPort): Properties = { + port: Int = RandomPort, enableSSL: Boolean = false, sslPort: Int = RandomPort, trustStoreFile: Option[File] = None): Properties = { val props = new Properties + var listeners: String = "PLAINTEXT://localhost:"+port.toString if (nodeId >= 0) props.put("broker.id", nodeId.toString) - props.put("listeners", "PLAINTEXT://localhost:"+port.toString) + if (enableSSL) + listeners = listeners + "," + "SSL://localhost:"+sslPort.toString + props.put("listeners", listeners) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", zkConnect) props.put("replica.socket.timeout.ms", "1500") @@ -157,6 +175,9 @@ object TestUtils extends Logging { props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props.put("delete.topic.enable", enableDeleteTopic.toString) props.put("controlled.shutdown.retry.backoff.ms", "100") + if (enableSSL) { + props.putAll(addSSLConfigs(SSLFactory.Mode.SERVER, true, trustStoreFile, "server"+nodeId)) + } props.put("port", port.toString) props } @@ -381,7 +402,9 @@ object TestUtils extends Logging { blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, retries: Int = 0, - lingerMs: Long = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { + lingerMs: Long = 0, + enableSSL: Boolean = false, + trustStoreFile: Option[File] = None) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -396,6 +419,10 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + if (enableSSL) { + producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL") + producerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "producer")) + } new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } @@ -405,7 +432,12 @@ object TestUtils extends Logging { def createNewConsumer(brokerList: String, groupId: String, autoOffsetReset: String = "earliest", - partitionFetchSize: Long = 4096L) : KafkaConsumer[Array[Byte],Array[Byte]] = { + partitionFetchSize: Long = 4096L, + partitionAssignmentStrategy: String = "blah", + sessionTimeout: Int = 30000, + callback: Option[ConsumerRebalanceCallback] = None, + enableSSL: Boolean = false, + trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.consumer.ConsumerConfig val consumerProps= new Properties() @@ -417,7 +449,17 @@ object TestUtils extends Logging { consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy) + consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString) + if (enableSSL) { + consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL") + consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "consumer")) + } + if (callback.isDefined) { + new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps, callback.get, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + } else { + new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) + } } /** @@ -873,6 +915,37 @@ object TestUtils extends Logging { new String(bytes, encoding) } + def addSSLConfigs(mode: SSLFactory.Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String): Properties = { + var sslConfigs: java.util.Map[String, Object] = new java.util.HashMap[String, Object]() + if (!trustStoreFile.isDefined) { + throw new Exception("enableSSL set to true but no trustStoreFile provided") + } + if (mode == SSLFactory.Mode.SERVER) + sslConfigs = TestSSLUtils.createSSLConfig(true, true, mode, trustStoreFile.get, certAlias) + else + sslConfigs = TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStoreFile.get, certAlias) + + val sslProps = new Properties() + sslConfigs.foreach(kv => + sslProps.put(kv._1, kv._2) + ) + sslProps + } + + // a X509TrustManager to trust self-signed certs for unit tests. + def trustAllCerts: X509TrustManager = { + val trustManager = new X509TrustManager() { + override def getAcceptedIssuers: Array[X509Certificate] = { + null + } + override def checkClientTrusted(certs: Array[X509Certificate], authType: String) { + } + override def checkServerTrusted(certs: Array[X509Certificate], authType: String) { + } + } + trustManager + } + } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {