[2/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong

2015-08-18 Thread junrao
http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
new file mode 100644
index 000..f79b65c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import java.util.Map;
+import java.util.List;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+
+import javax.net.ssl.*;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.SSLConfigs;
+
+
+public class SSLFactory implements Configurable {
+
+public enum Mode { CLIENT, SERVER };
+private String protocol;
+private String provider;
+private String kmfAlgorithm;
+private String tmfAlgorithm;
+private SecurityStore keystore = null;
+private String keyPassword;
+private SecurityStore truststore;
+private String[] cipherSuites;
+private String[] enabledProtocols;
+private String endpointIdentification;
+private SSLContext sslContext;
+private boolean needClientAuth;
+private boolean wantClientAuth;
+private final Mode mode;
+
+
+public SSLFactory(Mode mode) {
+this.mode = mode;
+}
+
+@Override
+public void configure(MapString, ? configs) throws KafkaException {
+this.protocol =  (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
+this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
+
+if (configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG) != null) {
+ListString cipherSuitesList = (ListString) 
configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
+this.cipherSuites = (String[]) cipherSuitesList.toArray(new 
String[cipherSuitesList.size()]);
+}
+
+if (configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG) != null) {
+ListString enabledProtocolsList = (ListString) 
configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
+this.enabledProtocols =  (String[]) 
enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
+}
+
+if 
(configs.containsKey(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) {
+this.endpointIdentification = (String) 
configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
+}
+
+if (configs.containsKey(SSLConfigs.SSL_CLIENT_AUTH_CONFIG)) {
+String clientAuthConfig = (String) 
configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
+if (clientAuthConfig.equals(required))
+this.needClientAuth = true;
+else if (clientAuthConfig.equals(requested))
+this.wantClientAuth = true;
+}
+
+this.kmfAlgorithm = (String) 
configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
+this.tmfAlgorithm = (String) 
configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
+
+if (checkKeyStoreConfigs(configs)) {
+createKeystore((String) 
configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+   (String) 
configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+   (String) 
configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+   (String) 
configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG));
+}
+
+createTruststore((String) 
configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+ (String) 
configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+ (String) 
configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+try {
+this.sslContext = createSSLContext();
+} catch (Exception e) {
+throw new KafkaException(e);
+}
+}
+
+
+private SSLContext 

[4/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong

2015-08-18 Thread junrao
kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by 
Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, 
Ismael Juma, Dong Lin, Jiangjie Qin and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e2c683f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e2c683f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e2c683f

Branch: refs/heads/trunk
Commit: 9e2c683f550b7ae58008d0bcb62238b7a2d89a65
Parents: 503bd36
Author: Sriharsha Chintalapani schintalap...@hortonworks.com
Authored: Tue Aug 18 21:51:15 2015 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Tue Aug 18 21:51:15 2015 -0700

--
 build.gradle|   6 +-
 checkstyle/import-control.xml   |  52 +-
 .../org/apache/kafka/clients/ClientUtils.java   |  37 +-
 .../kafka/clients/CommonClientConfigs.java  |  18 +-
 .../org/apache/kafka/clients/NetworkClient.java |  41 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  26 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |  92 +--
 .../kafka/clients/producer/KafkaProducer.java   |   7 +-
 .../kafka/clients/producer/ProducerConfig.java  |  18 +
 .../kafka/common/config/AbstractConfig.java |  10 +-
 .../apache/kafka/common/config/SSLConfigs.java  | 102 +++
 .../kafka/common/network/Authenticator.java |  62 ++
 .../kafka/common/network/ByteBufferSend.java|  13 +-
 .../kafka/common/network/ChannelBuilder.java|  44 ++
 .../common/network/DefaultAuthenticator.java|  63 ++
 .../kafka/common/network/KafkaChannel.java  | 166 +
 .../kafka/common/network/NetworkReceive.java|   5 +-
 .../common/network/PlaintextChannelBuilder.java |  58 ++
 .../common/network/PlaintextTransportLayer.java | 217 ++
 .../kafka/common/network/SSLChannelBuilder.java |  68 ++
 .../kafka/common/network/SSLTransportLayer.java | 690 +++
 .../apache/kafka/common/network/Selectable.java |  12 +-
 .../apache/kafka/common/network/Selector.java   | 316 +
 .../org/apache/kafka/common/network/Send.java   |   6 +-
 .../kafka/common/network/TransportLayer.java|  86 +++
 .../kafka/common/protocol/SecurityProtocol.java |   2 +
 .../security/auth/DefaultPrincipalBuilder.java  |  43 ++
 .../common/security/auth/KafkaPrincipal.java|  58 ++
 .../common/security/auth/PrincipalBuilder.java  |  51 ++
 .../kafka/common/security/ssl/SSLFactory.java   | 210 ++
 .../org/apache/kafka/common/utils/Utils.java|  53 +-
 .../apache/kafka/clients/ClientUtilsTest.java   |   2 +-
 .../clients/producer/KafkaProducerTest.java |  19 +-
 .../apache/kafka/common/network/EchoServer.java | 119 
 .../kafka/common/network/SSLSelectorTest.java   | 276 
 .../kafka/common/network/SelectorTest.java  | 110 ++-
 .../common/security/ssl/SSLFactoryTest.java |  60 ++
 .../apache/kafka/common/utils/UtilsTest.java|   4 +-
 .../org/apache/kafka/test/MockSelector.java |   8 +-
 .../org/apache/kafka/test/TestSSLUtils.java | 243 +++
 .../main/scala/kafka/api/FetchResponse.scala|  24 +-
 .../main/scala/kafka/network/SocketServer.scala | 175 +++--
 .../main/scala/kafka/server/KafkaConfig.scala   | 218 --
 .../main/scala/kafka/server/KafkaServer.scala   | 110 ++-
 .../kafka/api/ProducerSendTest.scala|   4 +-
 .../integration/kafka/api/SSLConsumerTest.scala | 251 +++
 .../kafka/api/SSLProducerSendTest.scala | 240 +++
 .../unit/kafka/admin/AddPartitionsTest.scala|   2 +-
 .../integration/UncleanLeaderElectionTest.scala |   6 +-
 .../unit/kafka/network/SocketServerTest.scala   |  86 ++-
 .../unit/kafka/server/KafkaConfigTest.scala |  42 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  89 ++-
 52 files changed, 4121 insertions(+), 599 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 983587f..17fc223 100644
--- a/build.gradle
+++ b/build.gradle
@@ -256,9 +256,10 @@ project(':core') {
 testCompile $junit
 testCompile $easymock
 testCompile 'org.objenesis:objenesis:1.2'
+testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
 testCompile org.scalatest:scalatest_$baseScalaVersion:2.2.5
-testCompile project(path: ':clients', configuration: 'archives')
-
+testCompile project(':clients')
+testCompile project(':clients').sourceSets.test.output
 testRuntime $slf4jlog4j
 
 zinc 'com.typesafe.zinc:zinc:0.3.7'
@@ -390,6 +391,7 @@ project(':clients') {
 compile 'org.xerial.snappy:snappy-java:1.1.1.7'
 compile 'net.jpountz.lz4:lz4:1.2.0'
 
+testCompile 'org.bouncycastle:bcpkix-jdk15on:1.52'
 testCompile $junit
  

[1/4] kafka git commit: kafka-1690; Add SSL support to Kafka Broker, Producer and Consumer; patched by Sriharsha Chintalapani; reviewed Rajini Sivaram, Joel Koshy, Michael Herstine, Ismael Juma, Dong

2015-08-18 Thread junrao
Repository: kafka
Updated Branches:
  refs/heads/trunk 503bd3664 - 9e2c683f5


http://git-wip-us.apache.org/repos/asf/kafka/blob/9e2c683f/core/src/main/scala/kafka/server/KafkaConfig.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c39402c..d547a01 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1,19 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the License); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an AS IS BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.server
 
@@ -26,14 +26,17 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SSLConfigs
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
 import org.apache.kafka.common.config.ConfigDef.Type._
+
 import org.apache.kafka.common.config.{ConfigException, AbstractConfig, 
ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.PrincipalBuilder
+import scala.collection.{mutable, immutable, JavaConversions, Map}
 
-import scala.collection.{Map, immutable}
 
 object Defaults {
   /** * Zookeeper Configuration ***/
@@ -151,6 +154,25 @@ object Defaults {
   val MetricNumSamples = 2
   val MetricSampleWindowMs = 3
   val MetricReporterClasses = 
+
+  /** * SSL configuration ***/
+  val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
+  val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL
+  val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS
+  val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+  val SSLKeystoreLocation = /tmp/ssl.keystore.jks
+  val SSLKeystorePassword = keystore_password
+  val SSLKeyPassword = key_password
+  val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+  val SSLTruststoreLocation = SSLConfigs.DEFAULT_TRUSTSTORE_LOCATION
+  val SSLTruststorePassword = SSLConfigs.DEFAULT_TRUSTSTORE_PASSWORD
+  val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
+  val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
+  val SSLClientAuthRequired = required
+  val SSLClientAuthRequested = requested
+  val SSLClientAuthNone = none
+  val SSLClientAuth = SSLClientAuthNone
+
 }
 
 object KafkaConfig {
@@ -278,6 +300,25 @@ object KafkaConfig {
   val MetricNumSamplesProp: String = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
   val MetricReporterClassesProp: String = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
 
+  /** * SSL Configuration /
+  val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+  val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
+  val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
+  val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
+  val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
+  val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
+  val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
+  val SSLKeystorePasswordProp = 

kafka git commit: KAFKA-2436; log.retention.hours should be honored by LogManager

2015-08-18 Thread gwenshap
Repository: kafka
Updated Branches:
  refs/heads/trunk 786867c2e - 503bd3664


KAFKA-2436; log.retention.hours should be honored by LogManager

Author: Dong Lin lindon...@gmail.com

Reviewers: Joel Koshy, Gwen Shapira

Closes #142 from lindong28/KAFKA-2436


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/503bd366
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/503bd366
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/503bd366

Branch: refs/heads/trunk
Commit: 503bd36647695e8cc91893ffb80346dd03eb0bc5
Parents: 786867c
Author: Dong Lin lindon...@gmail.com
Authored: Tue Aug 18 13:03:11 2015 -0700
Committer: Gwen Shapira csh...@gmail.com
Committed: Tue Aug 18 13:03:11 2015 -0700

--
 core/src/main/scala/kafka/log/LogConfig.scala   |  5 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  8 +--
 .../main/scala/kafka/server/KafkaServer.scala   | 65 ++--
 .../scala/unit/kafka/log/LogConfigTest.scala| 19 ++
 4 files changed, 58 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/log/LogConfig.scala
--
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index c969d16..7fc7e33 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,7 +47,10 @@ object Defaults {
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends 
AbstractConfig(LogConfig.configDef, props, false) {
-
+  /**
+   * Important note: Any configuration parameter that is passed along from 
KafkaConfig to LogConfig
+   * should also go in copyKafkaConfigToLog.
+   */
   val segmentSize = getInt(LogConfig.SegmentBytesProp)
   val segmentMs = getLong(LogConfig.SegmentMsProp)
   val segmentJitterMs = getLong(LogConfig.SegmentJitterMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/503bd366/core/src/main/scala/kafka/server/KafkaConfig.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 394f21b..c39402c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -335,7 +335,7 @@ object KafkaConfig {
   val LogRetentionTimeHoursDoc = The number of hours to keep a log file 
before deleting it (in hours), tertiary to  + LogRetentionTimeMillisProp +  
property
 
   val LogRetentionBytesDoc = The maximum size of the log before deleting it
-  val LogCleanupIntervalMsDoc = The frequency in minutes that the log cleaner 
checks whether any log is eligible for deletion
+  val LogCleanupIntervalMsDoc = The frequency in milliseconds that the log 
cleaner checks whether any log is eligible for deletion
   val LogCleanupPolicyDoc = The default cleanup policy for segments beyond 
the retention window, must be either \delete\ or \compact\
   val LogCleanerThreadsDoc = The number of background threads to use for log 
cleaning
   val LogCleanerIoMaxBytesPerSecondDoc = The log cleaner will be throttled so 
that the sum of its read and write i/o will be less than this value on average
@@ -652,8 +652,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
   val logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * 
getInt(KafkaConfig.LogRollTimeHoursProp))
   val logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
   val logFlushIntervalMs: java.lang.Long = 
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
+  val logRetentionTimeMillis = getLogRetentionTimeMillis
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
-  val logPreAllocateEnable: Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
+  val logPreAllocateEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
 
   /** * Replication configuration ***/
   val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -672,7 +673,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends 
AbstractConfig(Kafka
   val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
   val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  val uncleanLeaderElectionEnable =