[1/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

2017-05-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk e1abf1770 -> 972b75453


http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
--
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 9eb1275..a362577 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerConfig, Produce
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
 import CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
@@ -40,6 +40,7 @@ import scala.collection.mutable
 import scala.collection.mutable.Buffer
 import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
+import kafka.log.LogConfig
 import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
@@ -65,13 +66,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val deleteTopicResource = new Resource(Topic, deleteTopic)
   val producerTransactionalIdResource = new Resource(ProducerTransactionalId, 
transactionalId)
 
-  val GroupReadAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val ClusterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
-  val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
-  val TopicReadAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
-  val TopicWriteAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
-  val TopicDescribeAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
-  val TopicDeleteAcl = Map(deleteTopicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
+  val groupReadAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+  val clusterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
+  val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+  val topicReadAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
+  val topicWriteAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
+  val topicDescribeAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
+  val topicDeleteAcl = Map(deleteTopicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)))
+  val topicDescribeConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs)))
+  val topicAlterConfigsAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs)))
   val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> 
Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
 
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
@@ -92,7 +95,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
   }
 
-  val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
+  val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] =
 Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse],
   ApiKeys.PRODUCE -> classOf[requests.ProduceResponse],
   ApiKeys.FETCH -> classOf[requests.FetchResponse],
@@ -110,15 +113,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   ApiKeys.CONTROLLED_SHUTDOWN_KEY -> 
classOf[requests.ControlledShutdownResponse],
   ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
   ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
-  ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse]
+  ApiKeys.OFFSET_FOR_LEADER_EPOCH -> 

[2/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

2017-05-17 Thread ijuma
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
new file mode 100644
index 000..26034eb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+/**
+ * Encapsulates an error code (via the Errors enum) and an optional message. 
Generally, the optional message is only
+ * defined if it adds information over the default message associated with the 
error code.
+ *
+ * This is an internal class (like every class in the requests package).
+ */
+public class ApiError {
+
+private static final String CODE_KEY_NAME = "error_code";
+private static final String MESSAGE_KEY_NAME = "error_message";
+
+private final Errors error;
+private final String message;
+
+public static ApiError fromThrowable(Throwable t) {
+// Avoid populating the error message if it's a generic one
+Errors error = Errors.forException(t);
+String message = error.message().equals(t.getMessage()) ? null : 
t.getMessage();
+return new ApiError(error, message);
+}
+
+public ApiError(Struct struct) {
+error = Errors.forCode(struct.getShort(CODE_KEY_NAME));
+// In some cases, the error message field was introduced in newer 
version
+if (struct.hasField(MESSAGE_KEY_NAME))
+message = struct.getString(MESSAGE_KEY_NAME);
+else
+message = null;
+}
+
+public ApiError(Errors error, String message) {
+this.error = error;
+this.message = message;
+}
+
+public void write(Struct struct) {
+struct.set(CODE_KEY_NAME, error.code());
+// In some cases, the error message field was introduced in a newer 
protocol API version
+if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != 
Errors.NONE)
+struct.set(MESSAGE_KEY_NAME, message);
+}
+
+public boolean is(Errors error) {
+return this.error == error;
+}
+
+public Errors error() {
+return error;
+}
+
+/**
+ * Return the optional error message or null. Consider using {@link 
#messageWithFallback()} instead.
+ */
+public String message() {
+return message;
+}
+
+/**
+ * If `message` is defined, return it. Otherwise fallback to the default 
error message associated with the error
+ * code.
+ */
+public String messageWithFallback() {
+if (message == null)
+return error.message();
+return message;
+}
+
+public ApiException exception() {
+return error.exception(message);
+}
+
+@Override
+public String toString() {
+return "ApiError(error=" + error + ", message=" + message + ")";
+}
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index a0626cc..def4c85 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -42,9 +41,9 @@ public class CreateTopicsRequest 

[3/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

2017-05-17 Thread ijuma
KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)

Author: Ismael Juma 

Reviewers: Jun Rao 

Closes #3076 from ijuma/kafka-3267-describe-alter-configs-protocol


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/972b7545
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/972b7545
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/972b7545

Branch: refs/heads/trunk
Commit: 972b7545363ae85a55f94cf7ea83614be8840b75
Parents: e1abf17
Author: Ismael Juma 
Authored: Thu May 18 06:51:02 2017 +0100
Committer: Ismael Juma 
Committed: Thu May 18 06:51:02 2017 +0100

--
 checkstyle/suppressions.xml |   4 +-
 .../kafka/clients/admin/AclOperation.java   |  12 +-
 .../apache/kafka/clients/admin/AdminClient.java |  60 +
 .../clients/admin/AlterConfigsOptions.java  |  45 
 .../clients/admin/AlterConfigsResults.java  |  42 
 .../org/apache/kafka/clients/admin/Config.java  |  63 ++
 .../apache/kafka/clients/admin/ConfigEntry.java |  59 +
 .../kafka/clients/admin/ConfigResource.java |  65 ++
 .../clients/admin/DescribeConfigsOptions.java   |  37 +++
 .../clients/admin/DescribeConfigsResults.java   |  59 +
 .../kafka/clients/admin/KafkaAdminClient.java   | 184 ++-
 .../kafka/clients/admin/ResourceType.java   |   7 +-
 .../kafka/common/config/AbstractConfig.java |   7 +
 .../errors/BrokerAuthorizationException.java|  23 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   4 +-
 .../apache/kafka/common/protocol/Errors.java|  20 +-
 .../apache/kafka/common/protocol/Protocol.java  | 182 ++-
 .../kafka/common/protocol/types/Schema.java |  16 +-
 .../kafka/common/requests/AbstractRequest.java  |   6 +
 .../kafka/common/requests/AbstractResponse.java |   4 +
 .../common/requests/AlterConfigsRequest.java| 179 +++
 .../common/requests/AlterConfigsResponse.java   |  88 
 .../apache/kafka/common/requests/ApiError.java  |  99 
 .../common/requests/CreateTopicsRequest.java|  12 +-
 .../common/requests/CreateTopicsResponse.java   |  68 +-
 .../common/requests/DescribeConfigsRequest.java | 142 
 .../requests/DescribeConfigsResponse.java   | 186 +++
 .../apache/kafka/common/requests/Resource.java  |  60 +
 .../kafka/common/requests/ResourceType.java |  42 
 .../kafka/server/policy/CreateTopicPolicy.java  |   6 +-
 .../kafka/clients/admin/AclOperationTest.java   |   4 +-
 .../clients/admin/KafkaAdminClientTest.java |  20 +-
 .../kafka/clients/admin/ResourceTypeTest.java   |   3 +-
 .../common/requests/RequestResponseTest.java|  60 -
 .../src/main/scala/kafka/admin/AclCommand.scala |  14 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  33 ++-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  17 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   2 +-
 .../scala/kafka/security/auth/Operation.scala   |  20 +-
 .../kafka/security/auth/ResourceType.scala  |   7 +-
 .../security/auth/SimpleAclAuthorizer.scala |   2 +-
 .../main/scala/kafka/server/AdminManager.scala  | 109 -
 .../kafka/server/DelayedCreateTopics.scala  |   8 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  82 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |   6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 132 ++-
 .../api/KafkaAdminClientIntegrationTest.scala   | 224 ++-
 .../scala/unit/kafka/admin/AclCommandTest.scala |  11 +-
 .../AbstractCreateTopicsRequestTest.scala   |   8 +-
 .../unit/kafka/server/RequestQuotaTest.scala|  18 +-
 .../processor/internals/StreamsKafkaClient.java |   3 +-
 52 files changed, 2271 insertions(+), 295 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/checkstyle/suppressions.xml
--
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 66548d9..dc00bee 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -19,7 +19,7 @@
   files=".*/requests/AbstractResponse.java"/>
 
 
+  files="KerberosLogin.java|RequestResponseTest.java"/>
 
 
@@ -46,7 +46,7 @@
   
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
 
 
+  files="AbstractRequest.java|KerberosLogin.java"/>
 
 


[2/2] kafka git commit: KAFKA-2273; Sticky partition assignment strategy (KIP-54)

2017-05-17 Thread jgus
KAFKA-2273; Sticky partition assignment strategy (KIP-54)

This PR implements a new partition assignment strategy called "sticky", and 
it's purpose is to balance partitions across consumers in a way that minimizes 
moving partitions around, or, in other words, preserves existing partition 
assignments as much as possible.

This patch is co-authored with rajinisivaram and edoardocomar.

Author: Vahid Hashemian 

Reviewers: Jason Gustafson 

Closes #1020 from vahidhashemian/KAFKA-2273


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1abf177
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1abf177
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1abf177

Branch: refs/heads/trunk
Commit: e1abf17708918b82d3974ea028a4d74e3892fa0f
Parents: 9815e18
Author: Vahid Hashemian 
Authored: Wed May 17 20:13:19 2017 -0700
Committer: Jason Gustafson 
Committed: Wed May 17 20:15:17 2017 -0700

--
 .../kafka/clients/consumer/StickyAssignor.java  | 933 +++
 .../internals/AbstractPartitionAssignor.java|   8 +-
 .../consumer/internals/ConsumerProtocol.java|   1 -
 .../consumer/internals/PartitionAssignor.java   |   2 +-
 .../org/apache/kafka/common/TopicPartition.java |   1 -
 .../clients/consumer/StickyAssignorTest.java| 689 ++
 .../kafka/api/PlaintextConsumerTest.scala   |  58 +-
 7 files changed, 1685 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
new file mode 100644
index 000..58e5915
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -0,0 +1,933 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The sticky assignor serves two purposes. First, it guarantees an assignment 
that is as balanced as possible, meaning either:
+ * - the numbers of topic partitions assigned to consumers differ by at most 
one; or
+ * - each consumer that has 2+ fewer topic partitions than some other consumer 
cannot get any of those topic partitions transferred to it.
+ * Second, it preserved as many existing assignment as possible when a 
reassignment occurs. This helps in saving some of the
+ * overhead processing when topic partitions move from one consumer to another.
+ *
+ * Starting fresh it would work by distributing the partitions over consumers 
as evenly as possible. Even though this may sound similar to
+ * how round robin assignor works, the second example below shows that it is 
not.
+ * During a reassignment it would perform the reassignment in such a way that 
in the new assignment
+ * 1. topic partitions are still distributed as evenly as possible, and
+ * 2. topic 

[2/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)

2017-05-17 Thread ijuma
http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
new file mode 100644
index 000..f792bbd
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.clients.admin.AccessControlEntry;
+import org.apache.kafka.clients.admin.AclBinding;
+import org.apache.kafka.clients.admin.Resource;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateAclsRequest extends AbstractRequest {
+private final static String CREATIONS = "creations";
+
+public static class AclCreation {
+private final AclBinding acl;
+
+public AclCreation(AclBinding acl) {
+this.acl = acl;
+}
+
+static AclCreation fromStruct(Struct struct) {
+Resource resource = RequestUtils.resourceFromStructFields(struct);
+AccessControlEntry entry = 
RequestUtils.aceFromStructFields(struct);
+return new AclCreation(new AclBinding(resource, entry));
+}
+
+public AclBinding acl() {
+return acl;
+}
+
+void setStructFields(Struct struct) {
+RequestUtils.resourceSetStructFields(acl.resource(), struct);
+RequestUtils.aceSetStructFields(acl.entry(), struct);
+}
+
+@Override
+public String toString() {
+return "(acl=" + acl + ")";
+}
+}
+
+public static class Builder extends 
AbstractRequest.Builder {
+private final List creations;
+
+public Builder(List creations) {
+super(ApiKeys.CREATE_ACLS);
+this.creations = creations;
+}
+
+@Override
+public CreateAclsRequest build(short version) {
+return new CreateAclsRequest(version, creations);
+}
+
+@Override
+public String toString() {
+return "(type=CreateAclsRequest, creations=" + 
Utils.join(creations, ", ") + ")";
+}
+}
+
+private final List aclCreations;
+
+CreateAclsRequest(short version, List aclCreations) {
+super(version);
+this.aclCreations = aclCreations;
+}
+
+public CreateAclsRequest(Struct struct, short version) {
+super(version);
+this.aclCreations = new ArrayList<>();
+for (Object creationStructObj : struct.getArray(CREATIONS)) {
+Struct creationStruct = (Struct) creationStructObj;
+aclCreations.add(AclCreation.fromStruct(creationStruct));
+}
+}
+
+@Override
+protected Struct toStruct() {
+Struct struct = new 
Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
+List requests = new ArrayList<>();
+for (AclCreation creation : aclCreations) {
+Struct creationStruct = struct.instance(CREATIONS);
+creation.setStructFields(creationStruct);
+requests.add(creationStruct);
+}
+struct.set(CREATIONS, requests.toArray());
+return struct;
+}
+
+public List aclCreations() {
+return aclCreations;
+}
+
+@Override
+public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable 
throwable) {
+short versionId = version();
+switch (versionId) {
+case 0:
+List responses = new 
ArrayList<>();
+for (int i = 0; i < aclCreations.size(); i++) {
+responses.add(new 
CreateAclsResponse.AclCreationResponse(throwable));
+}
+return new CreateAclsResponse(throttleTimeMs, responses);
+default:
+throw new 

[1/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)

2017-05-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 249152062 -> 9815e18fe


http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/server/KafkaApis.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c3c37c1..bf7d4c1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,15 +43,20 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordBatch}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
+import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, 
AclBindingFilter, AclOperation, AclPermissionType, Resource => AdminResource, 
ResourceType => AdminResourceType}
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -118,6 +123,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.END_TXN => handleEndTxnRequest(request)
 case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
 case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
+case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
+case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
+case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
   }
 } catch {
   case e: FatalExitError => throw e
@@ -1655,6 +1663,217 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleDescribeAcls(request: RequestChannel.Request): Unit = {
+authorizeClusterAction(request)
+val describeAclsRequest = request.body[DescribeAclsRequest]
+authorizer match {
+  case None =>
+def createResponse(throttleTimeMs: Int): AbstractResponse =
+  new DescribeAclsResponse(throttleTimeMs, new 
SecurityDisabledException(
+"No Authorizer is configured on the broker."), 
Collections.emptySet[AclBinding]);
+sendResponseMaybeThrottle(request, createResponse)
+  case Some(auth) =>
+val filter = describeAclsRequest.filter()
+var returnedAcls = new util.ArrayList[AclBinding]
+val aclMap : Map[Resource, Set[Acl]] = auth.getAcls()
+aclMap.foreach {
+  case (resource, acls) => {
+acls.foreach {
+  case (acl) => {
+val fixture = new AclBinding(new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name),
+new AccessControlEntry(acl.principal.toString(), 
acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava))
+if (filter.matches(fixture))
+  returnedAcls.add(fixture)
+  }
+}
+  }
+}
+def createResponse(throttleTimeMs: Int): AbstractResponse =
+  new DescribeAclsResponse(throttleTimeMs, null, returnedAcls)
+sendResponseMaybeThrottle(request, createResponse)
+}
+  }
+
+  /**
+* Convert an ACL binding filter to a Scala object.
+* All ACL and resource fields must be specified (no UNKNOWN, ANY, or null 
fields are allowed.)
+*
+* @param filter The binding filter as a Java object.
+* @return   The binding filter as a scala object, or an exception 
if there was an error
+*   converting the Java object.
+*/
+  def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = {
+filter.resourceFilter().resourceType() match {
+  case AdminResourceType.UNKNOWN => return Failure(new 
InvalidRequestException("Invalid UNKNOWN resource type"))
+  case AdminResourceType.ANY => return Failure(new 
InvalidRequestException("Invalid ANY resource type"))
+  case _ => {}
+}
+var resourceType: ResourceType = null
+try {
+  resourceType = 
ResourceType.fromString(filter.resourceFilter().resourceType().toString)
+} catch {
+  case throwable: Throwable => return Failure(new 
InvalidRequestException("Invalid resource type"))

[3/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)

2017-05-17 Thread ijuma
KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)

Includes server-side code, protocol and AdminClient.

Author: Colin P. Mccabe 

Reviewers: Ismael Juma 

Closes #2941 from cmccabe/KAFKA-3266


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9815e18f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9815e18f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9815e18f

Branch: refs/heads/trunk
Commit: 9815e18fefa27d8d901356ec1d994a41b4db4622
Parents: 2491520
Author: Colin P. Mccabe 
Authored: Thu May 18 03:20:23 2017 +0100
Committer: Ismael Juma 
Committed: Thu May 18 03:20:30 2017 +0100

--
 checkstyle/import-control.xml   |   2 +
 checkstyle/suppressions.xml |   9 +-
 .../kafka/clients/admin/AccessControlEntry.java |  86 
 .../clients/admin/AccessControlEntryData.java   | 105 +
 .../clients/admin/AccessControlEntryFilter.java | 117 ++
 .../apache/kafka/clients/admin/AclBinding.java  |  74 +++
 .../kafka/clients/admin/AclBindingFilter.java   |  89 
 .../kafka/clients/admin/AclOperation.java   | 122 ++
 .../kafka/clients/admin/AclPermissionType.java  |  92 
 .../apache/kafka/clients/admin/AdminClient.java |  66 ++
 .../kafka/clients/admin/CreateAclsOptions.java  |  34 +++
 .../kafka/clients/admin/CreateAclsResults.java  |  48 
 .../kafka/clients/admin/DeleteAclsOptions.java  |  34 +++
 .../kafka/clients/admin/DeleteAclsResults.java  | 107 +
 .../clients/admin/DescribeAclsOptions.java  |  34 +++
 .../clients/admin/DescribeAclsResults.java  |  37 
 .../kafka/clients/admin/KafkaAdminClient.java   | 173 ++-
 .../apache/kafka/clients/admin/Resource.java|  74 +++
 .../kafka/clients/admin/ResourceFilter.java |  90 
 .../kafka/clients/admin/ResourceType.java   |  97 
 .../errors/SecurityDisabledException.java   |  32 +++
 .../apache/kafka/common/protocol/ApiKeys.java   |   5 +-
 .../apache/kafka/common/protocol/Errors.java|  12 +-
 .../apache/kafka/common/protocol/Protocol.java  |  90 
 .../kafka/common/requests/AbstractRequest.java  |   9 +
 .../kafka/common/requests/AbstractResponse.java |   6 +
 .../common/requests/CreateAclsRequest.java  | 133 +++
 .../common/requests/CreateAclsResponse.java | 106 +
 .../common/requests/DeleteAclsRequest.java  | 111 ++
 .../common/requests/DeleteAclsResponse.java | 182 +++
 .../common/requests/DescribeAclsRequest.java|  90 
 .../common/requests/DescribeAclsResponse.java   | 128 +++
 .../kafka/common/requests/RequestUtils.java |  82 +++
 .../kafka/clients/admin/AclBindingTest.java | 110 +
 .../kafka/clients/admin/AclOperationTest.java   |  86 
 .../clients/admin/AclPermissionTypeTest.java|  80 +++
 .../clients/admin/KafkaAdminClientTest.java | 163 ++
 .../kafka/clients/admin/ResourceTypeTest.java   |  81 +++
 .../common/requests/RequestResponseTest.java|  80 +++
 .../scala/kafka/security/auth/Operation.scala   |  56 -
 .../kafka/security/auth/PermissionType.scala|  17 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 221 ++-
 .../api/KafkaAdminClientIntegrationTest.scala   |  23 +-
 .../api/SaslSslAdminClientIntegrationTest.scala |  87 +++-
 .../unit/kafka/server/RequestQuotaTest.scala|  24 +-
 45 files changed, 3472 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/import-control.xml
--
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d40c4d4..21d9d3c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -108,8 +108,10 @@
 
 
 
+  
   
   
+  
   
   
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/suppressions.xml
--
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9729ee5..66548d9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,7 +40,7 @@
   files=".*/protocol/Errors.java"/>
 
 
+  files="(Utils|KafkaLZ4BlockOutputStream|AclData).java"/>
 
 
@@ -48,12 +48,15 @@
 
 
+
+
 
 
 
 
+  
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
 
 
@@ -66,7 +69,7 @@
   files="DistributedHerder.java"/>
 

kafka git commit: KAFKA-5036; hold onto the leader lock in Partition while serving an O…

2017-05-17 Thread junrao
Repository: kafka
Updated Branches:
  refs/heads/trunk c64cfd2e2 -> 249152062


KAFKA-5036; hold onto the leader lock in Partition while serving an O…

…ffsetForLeaderEpoch request

Author: Jun Rao 

Reviewers: Ismael Juma , Ben Stopford 

Closes #3074 from junrao/kafka-5036


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24915206
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24915206
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24915206

Branch: refs/heads/trunk
Commit: 24915206260c33cd7118db5359f3927d3be1ff60
Parents: c64cfd2
Author: Jun Rao 
Authored: Wed May 17 18:54:46 2017 -0700
Committer: Jun Rao 
Committed: Wed May 17 18:54:46 2017 -0700

--
 .../main/scala/kafka/cluster/Partition.scala| 45 +++
 core/src/main/scala/kafka/log/Log.scala |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  6 +-
 .../scala/kafka/server/ReplicaManager.scala | 48 +--
 .../server/epoch/LeaderEpochFileCache.scala |  2 +-
 .../epoch/OffsetsForLeaderEpochTest.scala   | 83 +++-
 6 files changed, 100 insertions(+), 86 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/cluster/Partition.scala
--
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1d13689..f123a16 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -16,28 +16,30 @@
  */
 package kafka.cluster
 
-import kafka.common._
-import kafka.utils._
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.admin.AdminUtils
-import kafka.api.LeaderAndIsr
-import kafka.log.LogConfig
-import kafka.server._
-import kafka.metrics.KafkaMetricsGroup
-import kafka.controller.KafkaController
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.errors.{PolicyViolationException, 
NotEnoughReplicasException, NotLeaderForPartitionException}
-import org.apache.kafka.common.protocol.Errors
-
-import scala.collection.JavaConverters._
 import com.yammer.metrics.core.Gauge
+import kafka.admin.AdminUtils
+import kafka.api.LeaderAndIsr
+import kafka.common.NotAssignedReplicaException
+import kafka.controller.KafkaController
+import kafka.log.LogConfig
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server._
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.PartitionState
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState}
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
+
 /**
  * Data structure that represents a topic partition. The leader maintains the 
AR, ISR, CUR, RAR
  */
@@ -510,6 +512,21 @@ class Partition(val topic: String,
 }
   }
 
+  /**
+* @param leaderEpoch Requested leader epoch
+* @return The last offset of messages published under this leader epoch.
+*/
+  def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
+inReadLock(leaderIsrUpdateLock) {
+  leaderReplicaIfLocal match {
+case Some(leaderReplica) =>
+  new EpochEndOffset(NONE, 
leaderReplica.epochs.get.endOffsetFor(leaderEpoch))
+case None =>
+  new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET)
+  }
+}
+  }
+
   private def updateIsr(newIsr: Set[Replica]) {
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.map(r => r.brokerId).toList, zkVersion)
 val (updateSucceeded,newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/log/Log.scala
--
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index dd22a26..7a47657 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -513,7 +513,7 @@ class Log(@volatile var dir: File,
* @throws KafkaStorageException If the append fails due to an I/O error.
* 

kafka git commit: KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid

2017-05-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk b3a33ce4b -> c64cfd2e2


KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid

Author: Guozhang Wang 

Reviewers: Jason Gustafson, Jun Rao

Closes #3066 from guozhangwang/K5231-bump-up-epoch-when-abort-txn


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c64cfd2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c64cfd2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c64cfd2e

Branch: refs/heads/trunk
Commit: c64cfd2e2bffd0a8fcfa515d3bbbe76416a89ee7
Parents: b3a33ce
Author: Guozhang Wang 
Authored: Wed May 17 18:05:12 2017 -0700
Committer: Guozhang Wang 
Committed: Wed May 17 18:05:12 2017 -0700

--
 checkstyle/checkstyle.xml   |   2 +-
 .../clients/producer/internals/Sender.java  |   8 +-
 .../transaction/DelayedTxnMarker.scala  |   1 +
 .../transaction/TransactionCoordinator.scala|  10 +-
 .../TransactionMarkerChannelManager.scala   |   6 +-
 ...nsactionMarkerRequestCompletionHandler.scala |  59 +++---
 .../transaction/TransactionMetadata.scala   |  18 ++-
 .../transaction/TransactionStateManager.scala   | 113 +--
 .../scala/kafka/server/DelayedOperation.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   2 +-
 .../kafka/api/TransactionsTest.scala|   2 +-
 .../TransactionCoordinatorTest.scala|   6 +-
 .../TransactionStateManagerTest.scala   |   2 +-
 13 files changed, 157 insertions(+), 74 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/checkstyle/checkstyle.xml
--
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 6a263cc..ed846cd 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -111,7 +111,7 @@
 
 
   
-  
+  
 
 
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7180171..8dea9c6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,12 +40,14 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -440,9 +442,11 @@ public class Sender implements Runnable {
  * Handle a produce response
  */
 private void handleProduceResponse(ClientResponse response, 
Map batches, long now) {
-int correlationId = response.requestHeader().correlationId();
+RequestHeader requestHeader = response.requestHeader();
+int correlationId = requestHeader.correlationId();
 if (response.wasDisconnected()) {
-log.trace("Cancelled request {} due to node {} being 
disconnected", response, response.destination());
+ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
+log.trace("Cancelled {} request {} with correlation id {}  due to 
node {} being disconnected", api, requestHeader, correlationId, 
response.destination());
 for (ProducerBatch batch : batches.values())
 completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, 
now);
 } else if (response.versionMismatch() != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
--
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala 
b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala

kafka git commit: KAFKA-4772: Exploit #peek to implement #print() and other methods

2017-05-17 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 8c7e66313 -> 6910baf54


KAFKA-4772: Exploit #peek to implement #print() and other methods

I remove `KeyValuePrinter` and `KStreamForeach` two class, then implements them 
by `KStreamPeek`.
So, now `KStreamPeek` can do `KeyValuePrinter` and `KStreamForeach` job.

Author: jameschien 
Author: JamesChien 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2955 from jedichien/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6910baf5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6910baf5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6910baf5

Branch: refs/heads/trunk
Commit: 6910baf548ebad4c1530432e51be40793b4a4f10
Parents: 8c7e663
Author: James Chien 
Authored: Wed May 17 11:15:31 2017 -0700
Committer: Guozhang Wang 
Committed: Wed May 17 11:15:31 2017 -0700

--
 .../streams/kstream/PrintForeachAction.java |  59 ++
 .../kstream/internals/KStreamForeach.java   |  43 
 .../streams/kstream/internals/KStreamImpl.java  |   9 +-
 .../streams/kstream/internals/KStreamPeek.java  |   9 +-
 .../streams/kstream/internals/KStreamPrint.java |  89 
 .../streams/kstream/internals/KTableImpl.java   |   9 +-
 .../kstream/internals/KeyValuePrinter.java  | 120 ---
 .../kstream/internals/KStreamPeekTest.java  |   9 +-
 .../kstream/internals/KStreamPrintTest.java |  91 
 .../internals/KeyValuePrinterProcessorTest.java | 207 ---
 10 files changed, 262 insertions(+), 383 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
new file mode 100644
index 000..3eb6d80
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.io.PrintWriter;
+
+public class PrintForeachAction implements ForeachAction {
+
+private final String streamName;
+private final PrintWriter printWriter;
+
+/**
+ * Print data message with given writer. The PrintWriter can be null in 
order to
+ * distinguish between {@code System.out} and the others. If the 
PrintWriter is {@code PrintWriter(System.out)},
+ * then it would close {@code System.out} output stream.
+ * 
+ * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code 
null} instead.
+ *
+ * @param printWriter Use {@code System.out.println} if {@code null}.
+ * @param streamName The given name will be printed.
+ */
+public PrintForeachAction(final PrintWriter printWriter, final String 
streamName) {
+this.printWriter = printWriter;
+this.streamName = streamName;
+}
+
+@Override
+public void apply(final K key, final V value) {
+final String data = String.format("[%s]: %s, %s", streamName, key, 
value);
+if (printWriter == null) {
+System.out.println(data);
+} else {
+printWriter.println(data);
+}
+}
+
+public void close() {
+if (printWriter == null) {
+System.out.flush();
+} else {
+printWriter.close();
+}
+}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
 

kafka git commit: KAFKA-5182: Reduce rebalance timeouts in request quota test

2017-05-17 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/trunk c36b5b7f6 -> 8c7e66313


KAFKA-5182: Reduce rebalance timeouts in request quota test

Reduce rebalance and session timeouts for join requests to trigger throttling 
in the request quota test.

Author: Rajini Sivaram 

Reviewers: Ismael Juma , Damian Guy 

Closes #3057 from rajinisivaram/KAFKA-5182-quotatest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c7e6631
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c7e6631
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c7e6631

Branch: refs/heads/trunk
Commit: 8c7e6631308ae986bd60df0b0761f68c777fadff
Parents: c36b5b7
Author: Rajini Sivaram 
Authored: Wed May 17 09:41:19 2017 -0400
Committer: Rajini Sivaram 
Committed: Wed May 17 09:41:19 2017 -0400

--
 .../unit/kafka/server/RequestQuotaTest.scala| 78 +---
 1 file changed, 33 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/8c7e6631/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4cf3e7d..1c496cd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -57,13 +57,14 @@ class RequestQuotaTest extends BaseRequestTest {
 properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
 properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
 properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100")
+properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
 properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[RequestQuotaTest.TestAuthorizer].getName)
 properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
   }
 
   @Before
   override def setUp() {
-
 RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS
 super.setUp()
 
@@ -86,54 +87,41 @@ class RequestQuotaTest extends BaseRequestTest {
 
   @After
   override def tearDown() {
-try {
-  executor.shutdownNow()
-} finally {
-  super.tearDown()
-}
+try executor.shutdownNow()
+finally super.tearDown()
   }
 
   @Test
   def testResponseThrottleTime() {
+for (apiKey <- RequestQuotaTest.ClientActions)
+  submitTest(apiKey, () => checkRequestThrottleTime(apiKey))
 
-for (apiKey <- RequestQuotaTest.ClientActions) {
-  val builder = requestBuilder(apiKey)
-  submitTest(apiKey, () => {
-checkRequestThrottleTime(apiKey)
-  })
-}
 waitAndCheckResults()
   }
 
   @Test
   def testUnthrottledClient() {
+for (apiKey <- RequestQuotaTest.ClientActions)
+  submitTest(apiKey, () => checkUnthrottledClient(apiKey))
 
-for (apiKey <- RequestQuotaTest.ClientActions) {
-  val builder = requestBuilder(apiKey)
-  submitTest(apiKey, () => {
-checkUnthrottledClient(apiKey)
-  })
-}
 waitAndCheckResults()
   }
 
   @Test
   def testExemptRequestTime() {
-
-for (apiKey <- RequestQuotaTest.ClusterActions) {
+for (apiKey <- RequestQuotaTest.ClusterActions)
   submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
-}
+
 waitAndCheckResults()
   }
 
   @Test
   def testUnauthorizedThrottle() {
-
 RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-for (apiKey <- ApiKeys.values) {
+for (apiKey <- ApiKeys.values)
   submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
-}
+
 waitAndCheckResults()
   }
 
@@ -171,19 +159,19 @@ class RequestQuotaTest extends BaseRequestTest {
   private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: 
AbstractRequest] = {
 apiKey match {
 case ApiKeys.PRODUCE =>
-  new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 
1, 5000,
+  new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000,
 collection.mutable.Map(tp -> 
MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("test".getBytes))).asJava)
 
 case ApiKeys.FETCH =>
-  val partitionMap = new LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
-  partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 
100))
-  requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap)
+  val partitionMap = new 

kafka git commit: MINOR: Make some constructors in admin package public

2017-05-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 62d5aac5d -> c36b5b7f6


MINOR: Make some constructors in admin package public

Add a public create API that takes a Properties instance.

Make the constructors for TopicDescription, TopicListing
and TopicPartitionInfo public to enable AdminClient
users to write better tests.

Author: Colin P. Mccabe 

Reviewers: Ismael Juma 

Closes #3070 from cmccabe/publicapi


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36b5b7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36b5b7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36b5b7f

Branch: refs/heads/trunk
Commit: c36b5b7f6ef5767c9455cca093ce05bf2a54d5db
Parents: 62d5aac
Author: Colin P. Mccabe 
Authored: Wed May 17 12:41:21 2017 +0100
Committer: Ismael Juma 
Committed: Wed May 17 12:41:28 2017 +0100

--
 .../org/apache/kafka/clients/admin/AdminClient.java| 13 -
 .../apache/kafka/clients/admin/KafkaAdminClient.java   |  2 +-
 .../apache/kafka/clients/admin/TopicDescription.java   |  2 +-
 .../org/apache/kafka/clients/admin/TopicListing.java   |  2 +-
 .../apache/kafka/clients/admin/TopicPartitionInfo.java |  2 +-
 5 files changed, 16 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 7db5e6e..a976ca4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * The public interface for the {@link KafkaAdminClient}, which supports 
managing and inspecting topics,
@@ -34,11 +35,21 @@ public abstract class AdminClient implements AutoCloseable {
 /**
  * Create a new AdminClient with the given configuration.
  *
+ * @param props The configuration.
+ * @return  The new KafkaAdminClient.
+ */
+public static AdminClient create(Properties props) {
+return KafkaAdminClient.createInternal(new AdminClientConfig(props));
+}
+
+/**
+ * Create a new AdminClient with the given configuration.
+ *
  * @param conf  The configuration.
  * @return  The new KafkaAdminClient.
  */
 public static AdminClient create(Map conf) {
-return KafkaAdminClient.create(new AdminClientConfig(conf));
+return KafkaAdminClient.createInternal(new AdminClientConfig(conf));
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ec10232..7dde027 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -238,7 +238,7 @@ public class KafkaAdminClient extends AdminClient {
 return throwable.getClass().getSimpleName();
 }
 
-static KafkaAdminClient create(AdminClientConfig config) {
+static KafkaAdminClient createInternal(AdminClientConfig config) {
 Metadata metadata = null;
 Metrics metrics = null;
 NetworkClient networkClient = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index 2fc4442..f13dfff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -29,7 +29,7 @@ public class TopicDescription {
 private final boolean internal;
 private final NavigableMap partitions;
 
-TopicDescription(String name, boolean internal,
+public TopicDescription(String name, boolean internal,
 

kafka git commit: MINOR: Use new-consumer config in MirrorMaker doc

2017-05-17 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 1cea4d8f5 -> 62d5aac5d


MINOR: Use new-consumer config in MirrorMaker doc

Mirrormaker was updated to default to the new consumer in 
3db752a565071c78e4b11eaafa739844fa785b04

Old consumer calls the param `auto.commit.enable`, new consumer calls it 
`enable.auto.commit`. So I updated the docstring to use the new consumer name 
for the param.

Author: Jeff Widman 

Reviewers: Ismael Juma 

Closes #2393 from jeffwidman/patch-1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62d5aac5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62d5aac5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62d5aac5

Branch: refs/heads/trunk
Commit: 62d5aac5dda5ad7108aeb41ac3006221e2dab74b
Parents: 1cea4d8
Author: Jeff Widman 
Authored: Wed May 17 12:36:38 2017 +0100
Committer: Ismael Juma 
Committed: Wed May 17 12:36:38 2017 +0100

--
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/62d5aac5/core/src/main/scala/kafka/tools/MirrorMaker.scala
--
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index b3a7978..d61b355 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -59,7 +59,7 @@ import org.apache.kafka.common.record.RecordBatch
  *max.block.ms=max long
  *max.in.flight.requests.per.connection=1
  *   2. Consumer Settings
- *auto.commit.enable=false
+ *enable.auto.commit=false
  *   3. Mirror Maker Setting:
  *abort.on.send.failure=true
  */



kafka git commit: KAFKA-4714; Flatten and Cast single message transforms (KIP-66)

2017-05-17 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk ebc7f7caa -> 1cea4d8f5


KAFKA-4714; Flatten and Cast single message transforms (KIP-66)

Author: Ewen Cheslack-Postava 

Reviewers: Konstantine Karantasis , Shikhar Bhushan 
, Jason Gustafson 

Closes #2458 from ewencp/kafka-3209-even-more-transforms


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cea4d8f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cea4d8f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cea4d8f

Branch: refs/heads/trunk
Commit: 1cea4d8f5a51cc5795ddd3af2ea015b9e14d937d
Parents: ebc7f7c
Author: Ewen Cheslack-Postava 
Authored: Tue May 16 23:05:35 2017 -0700
Committer: Jason Gustafson 
Committed: Tue May 16 23:05:35 2017 -0700

--
 .../kafka/connect/data/SchemaBuilder.java   |   2 +-
 .../kafka/connect/tools/TransformationDoc.java  |   6 +-
 .../apache/kafka/connect/transforms/Cast.java   | 417 +++
 .../kafka/connect/transforms/Flatten.java   | 281 +
 .../connect/transforms/util/Requirements.java   |   1 +
 .../connect/transforms/util/SchemaUtil.java |   4 +
 .../kafka/connect/transforms/CastTest.java  | 384 +
 .../kafka/connect/transforms/FlattenTest.java   | 257 
 8 files changed, 1350 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
--
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
index f0c4586..058660e 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
@@ -75,7 +75,7 @@ public class SchemaBuilder implements Schema {
 // Additional parameters for logical types.
 private Map parameters;
 
-private SchemaBuilder(Type type) {
+public SchemaBuilder(Type type) {
 this.type = type;
 if (type == Type.STRUCT) {
 fields = new LinkedHashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 5616613..1a8f0a8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.connect.tools;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.transforms.Cast;
 import org.apache.kafka.connect.transforms.ExtractField;
+import org.apache.kafka.connect.transforms.Flatten;
 import org.apache.kafka.connect.transforms.HoistField;
 import org.apache.kafka.connect.transforms.InsertField;
 import org.apache.kafka.connect.transforms.MaskField;
@@ -54,7 +56,9 @@ public class TransformationDoc {
 new DocInfo(ExtractField.class.getName(), 
ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
 new DocInfo(SetSchemaMetadata.class.getName(), 
SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF),
 new DocInfo(TimestampRouter.class.getName(), 
TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF),
-new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, 
RegexRouter.CONFIG_DEF)
+new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, 
RegexRouter.CONFIG_DEF),
+new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, 
Flatten.CONFIG_DEF),
+new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, 
Cast.CONFIG_DEF)
 );
 
 private static void printTransformationHtml(PrintStream out, DocInfo 
docInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
--
diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
new file mode 100644
index 000..17be48c
--- /dev/null
+++