[2/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java new file mode 100644 index 000..f79b65c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.ssl; + +import java.util.Map; +import java.util.List; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.security.KeyStore; + +import javax.net.ssl.*; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.SSLConfigs; + + +public class SSLFactory implements Configurable { + +public enum Mode { CLIENT, SERVER }; +private String protocol; +private String provider; +private String kmfAlgorithm; +private String tmfAlgorithm; +private SecurityStore keystore = null; +private String keyPassword; +private SecurityStore truststore; +private String[] cipherSuites; +private String[] enabledProtocols; +private String endpointIdentification; +private SSLContext sslContext; +private boolean needClientAuth; +private boolean wantClientAuth; +private final Mode mode; + + +public SSLFactory(Mode mode) { +this.mode = mode; +} + +@Override +public void configure(MapString, ? configs) throws KafkaException { +this.protocol = (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG); +this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG); + +if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) { +ListString cipherSuitesList = (ListString) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG); +this.cipherSuites = (String[]) cipherSuitesList.toArray(new String[cipherSuitesList.size()]); +} + +if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) { +ListString enabledProtocolsList = (ListString) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); +this.enabledProtocols = (String[]) enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); +} + +if (configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) { +this.endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); +} + +if (configs.containsKey(SSLConfigs.SSL_CLIENT_AUTH_CONFIG)) { +String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG); +if (clientAuthConfig.equals(required)) +this.needClientAuth = true; +else if (clientAuthConfig.equals(requested)) +this.wantClientAuth = true; +} + +this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); +this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + +if (checkKeyStoreConfigs(configs)) { +createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG), + (String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG), + (String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), + (String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG)); +} + +createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), + (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), + (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); +try { +this.sslContext = createSSLContext(); +} catch (Exception e) { +throw new KafkaException(e); +} +} + + +private SSLContext
[4/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong
kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e2c683f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e2c683f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e2c683f Branch: refs/heads/trunk Commit: 9e2c683f550b7ae58008d0bcb62238b7a2d89a65 Parents: 503bd36 Author: Sriharsha Chintalapani schintalap...@hortonworks.com Authored: Tue Aug 18 21:51:15 2015 -0700 Committer: Jun Rao jun...@gmail.com Committed: Tue Aug 18 21:51:15 2015 -0700 -- build.gradle| 6 +- checkstyle/import-control.xml | 52 +- .../org/apache/kafka/clients/ClientUtils.java | 37 +- .../kafka/clients/CommonClientConfigs.java | 18 +- .../org/apache/kafka/clients/NetworkClient.java | 41 +- .../kafka/clients/consumer/ConsumerConfig.java | 26 +- .../kafka/clients/consumer/KafkaConsumer.java | 92 +-- .../kafka/clients/producer/KafkaProducer.java | 7 +- .../kafka/clients/producer/ProducerConfig.java | 18 + .../kafka/common/config/AbstractConfig.java | 10 +- .../apache/kafka/common/config/SSLConfigs.java | 102 +++ .../kafka/common/network/Authenticator.java | 62 ++ .../kafka/common/network/ByteBufferSend.java| 13 +- .../kafka/common/network/ChannelBuilder.java| 44 ++ .../common/network/DefaultAuthenticator.java| 63 ++ .../kafka/common/network/KafkaChannel.java | 166 + .../kafka/common/network/NetworkReceive.java| 5 +- .../common/network/PlaintextChannelBuilder.java | 58 ++ .../common/network/PlaintextTransportLayer.java | 217 ++ .../kafka/common/network/SSLChannelBuilder.java | 68 ++ .../kafka/common/network/SSLTransportLayer.java | 690 +++ .../apache/kafka/common/network/Selectable.java | 12 +- .../apache/kafka/common/network/Selector.java | 316 + .../org/apache/kafka/common/network/Send.java | 6 +- .../kafka/common/network/TransportLayer.java| 86 +++ .../kafka/common/protocol/SecurityProtocol.java | 2 + .../security/auth/DefaultPrincipalBuilder.java | 43 ++ .../common/security/auth/KafkaPrincipal.java| 58 ++ .../common/security/auth/PrincipalBuilder.java | 51 ++ .../kafka/common/security/ssl/SSLFactory.java | 210 ++ .../org/apache/kafka/common/utils/Utils.java| 53 +- .../apache/kafka/clients/ClientUtilsTest.java | 2 +- .../clients/producer/KafkaProducerTest.java | 19 +- .../apache/kafka/common/network/EchoServer.java | 119 .../kafka/common/network/SSLSelectorTest.java | 276 .../kafka/common/network/SelectorTest.java | 110 ++- .../common/security/ssl/SSLFactoryTest.java | 60 ++ .../apache/kafka/common/utils/UtilsTest.java| 4 +- .../org/apache/kafka/test/MockSelector.java | 8 +- .../org/apache/kafka/test/TestSSLUtils.java | 243 +++ .../main/scala/kafka/api/FetchResponse.scala| 24 +- .../main/scala/kafka/network/SocketServer.scala | 175 +++-- .../main/scala/kafka/server/KafkaConfig.scala | 218 -- .../main/scala/kafka/server/KafkaServer.scala | 110 ++- .../kafka/api/ProducerSendTest.scala| 4 +- .../integration/kafka/api/SSLConsumerTest.scala | 251 +++ .../kafka/api/SSLProducerSendTest.scala | 240 +++ .../unit/kafka/admin/AddPartitionsTest.scala| 2 +- .../integration/UncleanLeaderElectionTest.scala | 6 +- .../unit/kafka/network/SocketServerTest.scala | 86 ++- .../unit/kafka/server/KafkaConfigTest.scala | 42 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 89 ++- 52 files changed, 4121 insertions(+), 599 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/build.gradle -- diff --git a/build.gradle b/build.gradle index 983587f..17fc223 100644 --- a/build.gradle +++ b/build.gradle @@ -256,9 +256,10 @@ project(':core') { testCompile $junit testCompile $easymock testCompile 'org.objenesis:objenesis:1.2' +testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52' testCompile org.scalatest:scalatest_$baseScalaVersion:2.2.5 -testCompile project(path: ':clients', configuration: 'archives') - +testCompile project(':clients') +testCompile project(':clients').sourceSets.test.output testRuntime $slf4jlog4j zinc 'com.typesafe.zinc:zinc:0.3.7' @@ -390,6 +391,7 @@ project(':clients') { compile 'org.xerial.snappy:snappy-java:1.1.1.7' compile 'net.jpountz.lz4:lz4:1.2.0' +testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52' testCompile $junit
[1/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong
Repository: kafka Updated Branches: refs/heads/trunk 503bd3664 - 9e2c683f5 http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaConfig.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c39402c..d547a01 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package kafka.server @@ -26,14 +26,17 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SSLConfigs import org.apache.kafka.common.config.ConfigDef.Importance._ import org.apache.kafka.common.config.ConfigDef.Range._ import org.apache.kafka.common.config.ConfigDef.Type._ + import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.PrincipalBuilder +import scala.collection.{mutable, immutable, JavaConversions, Map} -import scala.collection.{Map, immutable} object Defaults { /** * Zookeeper Configuration ***/ @@ -151,6 +154,25 @@ object Defaults { val MetricNumSamples = 2 val MetricSampleWindowMs = 3 val MetricReporterClasses = + + /** * SSL configuration ***/ + val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS + val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL + val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS + val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE + val SSLKeystoreLocation = /tmp/ssl.keystore.jks + val SSLKeystorePassword = keystore_password + val SSLKeyPassword = key_password + val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + val SSLTruststoreLocation = SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION + val SSLTruststorePassword = SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD + val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM + val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM + val SSLClientAuthRequired = required + val SSLClientAuthRequested = requested + val SSLClientAuthNone = none + val SSLClientAuth = SSLClientAuthNone + } object KafkaConfig { @@ -278,6 +300,25 @@ object KafkaConfig { val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG + /** * SSL Configuration / + val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG + val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG + val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG + val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG + val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG + val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG + val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG + val SSLKeystorePasswordProp =
kafka git commit: KAFKA-2436; log.retention.hours should be honored by LogManager
Repository: kafka Updated Branches: refs/heads/trunk 786867c2e - 503bd3664 KAFKA-2436; log.retention.hours should be honored by LogManager Author: Dong Lin lindon...@gmail.com Reviewers: Joel Koshy, Gwen Shapira Closes #142 from lindong28/KAFKA-2436 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366 Branch: refs/heads/trunk Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5 Parents: 786867c Author: Dong Lin lindon...@gmail.com Authored: Tue Aug 18 13:03:11 2015 -0700 Committer: Gwen Shapira csh...@gmail.com Committed: Tue Aug 18 13:03:11 2015 -0700 -- core/src/main/scala/kafka/log/LogConfig.scala | 5 +- .../main/scala/kafka/server/KafkaConfig.scala | 8 +-- .../main/scala/kafka/server/KafkaServer.scala | 65 ++-- .../scala/unit/kafka/log/LogConfigTest.scala| 19 ++ 4 files changed, 58 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala -- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index c969d16..7fc7e33 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -47,7 +47,10 @@ object Defaults { } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { - + /** + * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig + * should also go in copyKafkaConfigToLog. + */ val segmentSize = getInt(LogConfig.SegmentBytesProp) val segmentMs = getLong(LogConfig.SegmentMsProp) val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 394f21b..c39402c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -335,7 +335,7 @@ object KafkaConfig { val LogRetentionTimeHoursDoc = The number of hours to keep a log file before deleting it (in hours), tertiary to + LogRetentionTimeMillisProp + property val LogRetentionBytesDoc = The maximum size of the log before deleting it - val LogCleanupIntervalMsDoc = The frequency in minutes that the log cleaner checks whether any log is eligible for deletion + val LogCleanupIntervalMsDoc = The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion val LogCleanupPolicyDoc = The default cleanup policy for segments beyond the retention window, must be either \delete\ or \compact\ val LogCleanerThreadsDoc = The number of background threads to use for log cleaning val LogCleanerIoMaxBytesPerSecondDoc = The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average @@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)) val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) + val logRetentionTimeMillis = getLogRetentionTimeMillis val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) - val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) + val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) /** * Replication configuration ***/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) @@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp) val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) - val uncleanLeaderElectionEnable =