[1/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)
Repository: kafka Updated Branches: refs/heads/trunk e1abf1770 -> 972b75453 http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala -- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 9eb1275..a362577 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} import CreateTopicsRequest.TopicDetails import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{Node, TopicPartition, requests} @@ -40,6 +40,7 @@ import scala.collection.mutable import scala.collection.mutable.Buffer import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils +import kafka.log.LogConfig import kafka.network.SocketServer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} @@ -65,13 +66,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val deleteTopicResource = new Resource(Topic, deleteTopic) val producerTransactionalIdResource = new Resource(ProducerTransactionalId, transactionalId) - val GroupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val ClusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) - val ClusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) - val TopicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) - val TopicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) - val TopicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) - val TopicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) + val groupReadAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) + val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) + val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) + val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) + val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) + val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) + val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete))) + val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, DescribeConfigs))) + val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, AlterConfigs))) val producerTransactionalIdWriteAcl = Map(producerTransactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() @@ -92,7 +95,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") } - val RequestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] = + val requestKeyToResponseDeserializer: Map[ApiKeys, Class[_ <: Any]] = Map(ApiKeys.METADATA -> classOf[requests.MetadataResponse], ApiKeys.PRODUCE -> classOf[requests.ProduceResponse], ApiKeys.FETCH -> classOf[requests.FetchResponse], @@ -110,15 +113,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse], ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse], ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse], - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse] + ApiKeys.OFFSET_FOR_LEADER_EPOCH ->
[2/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java new file mode 100644 index 000..26034eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +/** + * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only + * defined if it adds information over the default message associated with the error code. + * + * This is an internal class (like every class in the requests package). + */ +public class ApiError { + +private static final String CODE_KEY_NAME = "error_code"; +private static final String MESSAGE_KEY_NAME = "error_message"; + +private final Errors error; +private final String message; + +public static ApiError fromThrowable(Throwable t) { +// Avoid populating the error message if it's a generic one +Errors error = Errors.forException(t); +String message = error.message().equals(t.getMessage()) ? null : t.getMessage(); +return new ApiError(error, message); +} + +public ApiError(Struct struct) { +error = Errors.forCode(struct.getShort(CODE_KEY_NAME)); +// In some cases, the error message field was introduced in newer version +if (struct.hasField(MESSAGE_KEY_NAME)) +message = struct.getString(MESSAGE_KEY_NAME); +else +message = null; +} + +public ApiError(Errors error, String message) { +this.error = error; +this.message = message; +} + +public void write(Struct struct) { +struct.set(CODE_KEY_NAME, error.code()); +// In some cases, the error message field was introduced in a newer protocol API version +if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE) +struct.set(MESSAGE_KEY_NAME, message); +} + +public boolean is(Errors error) { +return this.error == error; +} + +public Errors error() { +return error; +} + +/** + * Return the optional error message or null. Consider using {@link #messageWithFallback()} instead. + */ +public String message() { +return message; +} + +/** + * If `message` is defined, return it. Otherwise fallback to the default error message associated with the error + * code. + */ +public String messageWithFallback() { +if (message == null) +return error.message(); +return message; +} + +public ApiException exception() { +return error.exception(message); +} + +@Override +public String toString() { +return "ApiError(error=" + error + ", message=" + message + ")"; +} +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index a0626cc..def4c85 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -42,9 +41,9 @@ public class CreateTopicsRequest
[3/3] kafka git commit: KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133)
KAFKA-3267; Describe and Alter Configs Admin APIs (KIP-133) Author: Ismael JumaReviewers: Jun Rao Closes #3076 from ijuma/kafka-3267-describe-alter-configs-protocol Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/972b7545 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/972b7545 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/972b7545 Branch: refs/heads/trunk Commit: 972b7545363ae85a55f94cf7ea83614be8840b75 Parents: e1abf17 Author: Ismael Juma Authored: Thu May 18 06:51:02 2017 +0100 Committer: Ismael Juma Committed: Thu May 18 06:51:02 2017 +0100 -- checkstyle/suppressions.xml | 4 +- .../kafka/clients/admin/AclOperation.java | 12 +- .../apache/kafka/clients/admin/AdminClient.java | 60 + .../clients/admin/AlterConfigsOptions.java | 45 .../clients/admin/AlterConfigsResults.java | 42 .../org/apache/kafka/clients/admin/Config.java | 63 ++ .../apache/kafka/clients/admin/ConfigEntry.java | 59 + .../kafka/clients/admin/ConfigResource.java | 65 ++ .../clients/admin/DescribeConfigsOptions.java | 37 +++ .../clients/admin/DescribeConfigsResults.java | 59 + .../kafka/clients/admin/KafkaAdminClient.java | 184 ++- .../kafka/clients/admin/ResourceType.java | 7 +- .../kafka/common/config/AbstractConfig.java | 7 + .../errors/BrokerAuthorizationException.java| 23 ++ .../apache/kafka/common/protocol/ApiKeys.java | 4 +- .../apache/kafka/common/protocol/Errors.java| 20 +- .../apache/kafka/common/protocol/Protocol.java | 182 ++- .../kafka/common/protocol/types/Schema.java | 16 +- .../kafka/common/requests/AbstractRequest.java | 6 + .../kafka/common/requests/AbstractResponse.java | 4 + .../common/requests/AlterConfigsRequest.java| 179 +++ .../common/requests/AlterConfigsResponse.java | 88 .../apache/kafka/common/requests/ApiError.java | 99 .../common/requests/CreateTopicsRequest.java| 12 +- .../common/requests/CreateTopicsResponse.java | 68 +- .../common/requests/DescribeConfigsRequest.java | 142 .../requests/DescribeConfigsResponse.java | 186 +++ .../apache/kafka/common/requests/Resource.java | 60 + .../kafka/common/requests/ResourceType.java | 42 .../kafka/server/policy/CreateTopicPolicy.java | 6 +- .../kafka/clients/admin/AclOperationTest.java | 4 +- .../clients/admin/KafkaAdminClientTest.java | 20 +- .../kafka/clients/admin/ResourceTypeTest.java | 3 +- .../common/requests/RequestResponseTest.java| 60 - .../src/main/scala/kafka/admin/AclCommand.scala | 14 +- .../src/main/scala/kafka/admin/AdminUtils.scala | 33 ++- .../main/scala/kafka/admin/ConfigCommand.scala | 17 +- core/src/main/scala/kafka/log/LogConfig.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../scala/kafka/security/auth/Operation.scala | 20 +- .../kafka/security/auth/ResourceType.scala | 7 +- .../security/auth/SimpleAclAuthorizer.scala | 2 +- .../main/scala/kafka/server/AdminManager.scala | 109 - .../kafka/server/DelayedCreateTopics.scala | 8 +- .../src/main/scala/kafka/server/KafkaApis.scala | 82 ++- .../main/scala/kafka/server/KafkaServer.scala | 6 +- .../kafka/api/AuthorizerIntegrationTest.scala | 132 ++- .../api/KafkaAdminClientIntegrationTest.scala | 224 ++- .../scala/unit/kafka/admin/AclCommandTest.scala | 11 +- .../AbstractCreateTopicsRequestTest.scala | 8 +- .../unit/kafka/server/RequestQuotaTest.scala| 18 +- .../processor/internals/StreamsKafkaClient.java | 3 +- 52 files changed, 2271 insertions(+), 295 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/checkstyle/suppressions.xml -- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 66548d9..dc00bee 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -19,7 +19,7 @@ files=".*/requests/AbstractResponse.java"/> + files="KerberosLogin.java|RequestResponseTest.java"/> @@ -46,7 +46,7 @@ files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/> + files="AbstractRequest.java|KerberosLogin.java"/>
[2/2] kafka git commit: KAFKA-2273; Sticky partition assignment strategy (KIP-54)
KAFKA-2273; Sticky partition assignment strategy (KIP-54) This PR implements a new partition assignment strategy called "sticky", and it's purpose is to balance partitions across consumers in a way that minimizes moving partitions around, or, in other words, preserves existing partition assignments as much as possible. This patch is co-authored with rajinisivaram and edoardocomar. Author: Vahid HashemianReviewers: Jason Gustafson Closes #1020 from vahidhashemian/KAFKA-2273 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1abf177 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1abf177 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1abf177 Branch: refs/heads/trunk Commit: e1abf17708918b82d3974ea028a4d74e3892fa0f Parents: 9815e18 Author: Vahid Hashemian Authored: Wed May 17 20:13:19 2017 -0700 Committer: Jason Gustafson Committed: Wed May 17 20:15:17 2017 -0700 -- .../kafka/clients/consumer/StickyAssignor.java | 933 +++ .../internals/AbstractPartitionAssignor.java| 8 +- .../consumer/internals/ConsumerProtocol.java| 1 - .../consumer/internals/PartitionAssignor.java | 2 +- .../org/apache/kafka/common/TopicPartition.java | 1 - .../clients/consumer/StickyAssignorTest.java| 689 ++ .../kafka/api/PlaintextConsumerTest.scala | 58 +- 7 files changed, 1685 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/e1abf177/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java new file mode 100644 index 000..58e5915 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -0,0 +1,933 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: + * - the numbers of topic partitions assigned to consumers differ by at most one; or + * - each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it. + * Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the + * overhead processing when topic partitions move from one consumer to another. + * + * Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to + * how round robin assignor works, the second example below shows that it is not. + * During a reassignment it would perform the reassignment in such a way that in the new assignment + * 1. topic partitions are still distributed as evenly as possible, and + * 2. topic
[2/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java -- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java new file mode 100644 index 000..f792bbd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class CreateAclsRequest extends AbstractRequest { +private final static String CREATIONS = "creations"; + +public static class AclCreation { +private final AclBinding acl; + +public AclCreation(AclBinding acl) { +this.acl = acl; +} + +static AclCreation fromStruct(Struct struct) { +Resource resource = RequestUtils.resourceFromStructFields(struct); +AccessControlEntry entry = RequestUtils.aceFromStructFields(struct); +return new AclCreation(new AclBinding(resource, entry)); +} + +public AclBinding acl() { +return acl; +} + +void setStructFields(Struct struct) { +RequestUtils.resourceSetStructFields(acl.resource(), struct); +RequestUtils.aceSetStructFields(acl.entry(), struct); +} + +@Override +public String toString() { +return "(acl=" + acl + ")"; +} +} + +public static class Builder extends AbstractRequest.Builder { +private final List creations; + +public Builder(List creations) { +super(ApiKeys.CREATE_ACLS); +this.creations = creations; +} + +@Override +public CreateAclsRequest build(short version) { +return new CreateAclsRequest(version, creations); +} + +@Override +public String toString() { +return "(type=CreateAclsRequest, creations=" + Utils.join(creations, ", ") + ")"; +} +} + +private final List aclCreations; + +CreateAclsRequest(short version, List aclCreations) { +super(version); +this.aclCreations = aclCreations; +} + +public CreateAclsRequest(Struct struct, short version) { +super(version); +this.aclCreations = new ArrayList<>(); +for (Object creationStructObj : struct.getArray(CREATIONS)) { +Struct creationStruct = (Struct) creationStructObj; +aclCreations.add(AclCreation.fromStruct(creationStruct)); +} +} + +@Override +protected Struct toStruct() { +Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version())); +List requests = new ArrayList<>(); +for (AclCreation creation : aclCreations) { +Struct creationStruct = struct.instance(CREATIONS); +creation.setStructFields(creationStruct); +requests.add(creationStruct); +} +struct.set(CREATIONS, requests.toArray()); +return struct; +} + +public List aclCreations() { +return aclCreations; +} + +@Override +public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { +short versionId = version(); +switch (versionId) { +case 0: +List responses = new ArrayList<>(); +for (int i = 0; i < aclCreations.size(); i++) { +responses.add(new CreateAclsResponse.AclCreationResponse(throwable)); +} +return new CreateAclsResponse(throttleTimeMs, responses); +default: +throw new
[1/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)
Repository: kafka Updated Branches: refs/heads/trunk 249152062 -> 9815e18fe http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/server/KafkaApis.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c3c37c1..bf7d4c1 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,15 +43,20 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} -import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} +import org.apache.kafka.common.record._ +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse +import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.requests.SaslHandshakeResponse +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, Resource => AdminResource, ResourceType => AdminResourceType} import scala.collection._ import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} /** @@ -118,6 +123,9 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.END_TXN => handleEndTxnRequest(request) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request) +case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) +case ApiKeys.CREATE_ACLS => handleCreateAcls(request) +case ApiKeys.DELETE_ACLS => handleDeleteAcls(request) } } catch { case e: FatalExitError => throw e @@ -1655,6 +1663,217 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleDescribeAcls(request: RequestChannel.Request): Unit = { +authorizeClusterAction(request) +val describeAclsRequest = request.body[DescribeAclsRequest] +authorizer match { + case None => +def createResponse(throttleTimeMs: Int): AbstractResponse = + new DescribeAclsResponse(throttleTimeMs, new SecurityDisabledException( +"No Authorizer is configured on the broker."), Collections.emptySet[AclBinding]); +sendResponseMaybeThrottle(request, createResponse) + case Some(auth) => +val filter = describeAclsRequest.filter() +var returnedAcls = new util.ArrayList[AclBinding] +val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() +aclMap.foreach { + case (resource, acls) => { +acls.foreach { + case (acl) => { +val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), +new AccessControlEntry(acl.principal.toString(), acl.host.toString(), acl.operation.toJava, acl.permissionType.toJava)) +if (filter.matches(fixture)) + returnedAcls.add(fixture) + } +} + } +} +def createResponse(throttleTimeMs: Int): AbstractResponse = + new DescribeAclsResponse(throttleTimeMs, null, returnedAcls) +sendResponseMaybeThrottle(request, createResponse) +} + } + + /** +* Convert an ACL binding filter to a Scala object. +* All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are allowed.) +* +* @param filter The binding filter as a Java object. +* @return The binding filter as a scala object, or an exception if there was an error +* converting the Java object. +*/ + def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = { +filter.resourceFilter().resourceType() match { + case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN resource type")) + case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type")) + case _ => {} +} +var resourceType: ResourceType = null +try { + resourceType = ResourceType.fromString(filter.resourceFilter().resourceType().toString) +} catch { + case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type"))
[3/3] kafka git commit: KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140)
KAFKA-3266; Describe, Create and Delete ACLs Admin APIs (KIP-140) Includes server-side code, protocol and AdminClient. Author: Colin P. MccabeReviewers: Ismael Juma Closes #2941 from cmccabe/KAFKA-3266 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9815e18f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9815e18f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9815e18f Branch: refs/heads/trunk Commit: 9815e18fefa27d8d901356ec1d994a41b4db4622 Parents: 2491520 Author: Colin P. Mccabe Authored: Thu May 18 03:20:23 2017 +0100 Committer: Ismael Juma Committed: Thu May 18 03:20:30 2017 +0100 -- checkstyle/import-control.xml | 2 + checkstyle/suppressions.xml | 9 +- .../kafka/clients/admin/AccessControlEntry.java | 86 .../clients/admin/AccessControlEntryData.java | 105 + .../clients/admin/AccessControlEntryFilter.java | 117 ++ .../apache/kafka/clients/admin/AclBinding.java | 74 +++ .../kafka/clients/admin/AclBindingFilter.java | 89 .../kafka/clients/admin/AclOperation.java | 122 ++ .../kafka/clients/admin/AclPermissionType.java | 92 .../apache/kafka/clients/admin/AdminClient.java | 66 ++ .../kafka/clients/admin/CreateAclsOptions.java | 34 +++ .../kafka/clients/admin/CreateAclsResults.java | 48 .../kafka/clients/admin/DeleteAclsOptions.java | 34 +++ .../kafka/clients/admin/DeleteAclsResults.java | 107 + .../clients/admin/DescribeAclsOptions.java | 34 +++ .../clients/admin/DescribeAclsResults.java | 37 .../kafka/clients/admin/KafkaAdminClient.java | 173 ++- .../apache/kafka/clients/admin/Resource.java| 74 +++ .../kafka/clients/admin/ResourceFilter.java | 90 .../kafka/clients/admin/ResourceType.java | 97 .../errors/SecurityDisabledException.java | 32 +++ .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../apache/kafka/common/protocol/Errors.java| 12 +- .../apache/kafka/common/protocol/Protocol.java | 90 .../kafka/common/requests/AbstractRequest.java | 9 + .../kafka/common/requests/AbstractResponse.java | 6 + .../common/requests/CreateAclsRequest.java | 133 +++ .../common/requests/CreateAclsResponse.java | 106 + .../common/requests/DeleteAclsRequest.java | 111 ++ .../common/requests/DeleteAclsResponse.java | 182 +++ .../common/requests/DescribeAclsRequest.java| 90 .../common/requests/DescribeAclsResponse.java | 128 +++ .../kafka/common/requests/RequestUtils.java | 82 +++ .../kafka/clients/admin/AclBindingTest.java | 110 + .../kafka/clients/admin/AclOperationTest.java | 86 .../clients/admin/AclPermissionTypeTest.java| 80 +++ .../clients/admin/KafkaAdminClientTest.java | 163 ++ .../kafka/clients/admin/ResourceTypeTest.java | 81 +++ .../common/requests/RequestResponseTest.java| 80 +++ .../scala/kafka/security/auth/Operation.scala | 56 - .../kafka/security/auth/PermissionType.scala| 17 +- .../src/main/scala/kafka/server/KafkaApis.scala | 221 ++- .../api/KafkaAdminClientIntegrationTest.scala | 23 +- .../api/SaslSslAdminClientIntegrationTest.scala | 87 +++- .../unit/kafka/server/RequestQuotaTest.scala| 24 +- 45 files changed, 3472 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/import-control.xml -- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index d40c4d4..21d9d3c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -108,8 +108,10 @@ + + http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/checkstyle/suppressions.xml -- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 9729ee5..66548d9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -40,7 +40,7 @@ files=".*/protocol/Errors.java"/> + files="(Utils|KafkaLZ4BlockOutputStream|AclData).java"/> @@ -48,12 +48,15 @@ + + + files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/> @@ -66,7 +69,7 @@ files="DistributedHerder.java"/>
kafka git commit: KAFKA-5036; hold onto the leader lock in Partition while serving an O…
Repository: kafka Updated Branches: refs/heads/trunk c64cfd2e2 -> 249152062 KAFKA-5036; hold onto the leader lock in Partition while serving an O⦠â¦ffsetForLeaderEpoch request Author: Jun RaoReviewers: Ismael Juma , Ben Stopford Closes #3074 from junrao/kafka-5036 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/24915206 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/24915206 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/24915206 Branch: refs/heads/trunk Commit: 24915206260c33cd7118db5359f3927d3be1ff60 Parents: c64cfd2 Author: Jun Rao Authored: Wed May 17 18:54:46 2017 -0700 Committer: Jun Rao Committed: Wed May 17 18:54:46 2017 -0700 -- .../main/scala/kafka/cluster/Partition.scala| 45 +++ core/src/main/scala/kafka/log/Log.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 6 +- .../scala/kafka/server/ReplicaManager.scala | 48 +-- .../server/epoch/LeaderEpochFileCache.scala | 2 +- .../epoch/OffsetsForLeaderEpochTest.scala | 83 +++- 6 files changed, 100 insertions(+), 86 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/cluster/Partition.scala -- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689..f123a16 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -16,28 +16,30 @@ */ package kafka.cluster -import kafka.common._ -import kafka.utils._ -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.admin.AdminUtils -import kafka.api.LeaderAndIsr -import kafka.log.LogConfig -import kafka.server._ -import kafka.metrics.KafkaMetricsGroup -import kafka.controller.KafkaController import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock -import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException} -import org.apache.kafka.common.protocol.Errors - -import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge +import kafka.admin.AdminUtils +import kafka.api.LeaderAndIsr +import kafka.common.NotAssignedReplicaException +import kafka.controller.KafkaController +import kafka.log.LogConfig +import kafka.metrics.KafkaMetricsGroup +import kafka.server._ +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} +import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.PartitionState +import org.apache.kafka.common.requests.EpochEndOffset._ +import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState} import org.apache.kafka.common.utils.Time +import scala.collection.JavaConverters._ + /** * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ @@ -510,6 +512,21 @@ class Partition(val topic: String, } } + /** +* @param leaderEpoch Requested leader epoch +* @return The last offset of messages published under this leader epoch. +*/ + def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = { +inReadLock(leaderIsrUpdateLock) { + leaderReplicaIfLocal match { +case Some(leaderReplica) => + new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch)) +case None => + new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) + } +} + } + private def updateIsr(newIsr: Set[Replica]) { val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId, http://git-wip-us.apache.org/repos/asf/kafka/blob/24915206/core/src/main/scala/kafka/log/Log.scala -- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index dd22a26..7a47657 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -513,7 +513,7 @@ class Log(@volatile var dir: File, * @throws KafkaStorageException If the append fails due to an I/O error. *
kafka git commit: KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid
Repository: kafka Updated Branches: refs/heads/trunk b3a33ce4b -> c64cfd2e2 KAFKA-5231: Bump up producer epoch when sending abort txn markers on InitPid Author: Guozhang WangReviewers: Jason Gustafson, Jun Rao Closes #3066 from guozhangwang/K5231-bump-up-epoch-when-abort-txn Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c64cfd2e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c64cfd2e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c64cfd2e Branch: refs/heads/trunk Commit: c64cfd2e2bffd0a8fcfa515d3bbbe76416a89ee7 Parents: b3a33ce Author: Guozhang Wang Authored: Wed May 17 18:05:12 2017 -0700 Committer: Guozhang Wang Committed: Wed May 17 18:05:12 2017 -0700 -- checkstyle/checkstyle.xml | 2 +- .../clients/producer/internals/Sender.java | 8 +- .../transaction/DelayedTxnMarker.scala | 1 + .../transaction/TransactionCoordinator.scala| 10 +- .../TransactionMarkerChannelManager.scala | 6 +- ...nsactionMarkerRequestCompletionHandler.scala | 59 +++--- .../transaction/TransactionMetadata.scala | 18 ++- .../transaction/TransactionStateManager.scala | 113 +-- .../scala/kafka/server/DelayedOperation.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../kafka/api/TransactionsTest.scala| 2 +- .../TransactionCoordinatorTest.scala| 6 +- .../TransactionStateManagerTest.scala | 2 +- 13 files changed, 157 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/checkstyle/checkstyle.xml -- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 6a263cc..ed846cd 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -111,7 +111,7 @@ - + http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 7180171..8dea9c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -40,12 +40,14 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -440,9 +442,11 @@ public class Sender implements Runnable { * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map batches, long now) { -int correlationId = response.requestHeader().correlationId(); +RequestHeader requestHeader = response.requestHeader(); +int correlationId = requestHeader.correlationId(); if (response.wasDisconnected()) { -log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination()); +ApiKeys api = ApiKeys.forId(requestHeader.apiKey()); +log.trace("Cancelled {} request {} with correlation id {} due to node {} being disconnected", api, requestHeader, correlationId, response.destination()); for (ProducerBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now); } else if (response.versionMismatch() != null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c64cfd2e/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala -- diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
kafka git commit: KAFKA-4772: Exploit #peek to implement #print() and other methods
Repository: kafka Updated Branches: refs/heads/trunk 8c7e66313 -> 6910baf54 KAFKA-4772: Exploit #peek to implement #print() and other methods I remove `KeyValuePrinter` and `KStreamForeach` two class, then implements them by `KStreamPeek`. So, now `KStreamPeek` can do `KeyValuePrinter` and `KStreamForeach` job. Author: jameschienAuthor: JamesChien Reviewers: Matthias J. Sax, Guozhang Wang Closes #2955 from jedichien/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6910baf5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6910baf5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6910baf5 Branch: refs/heads/trunk Commit: 6910baf548ebad4c1530432e51be40793b4a4f10 Parents: 8c7e663 Author: James Chien Authored: Wed May 17 11:15:31 2017 -0700 Committer: Guozhang Wang Committed: Wed May 17 11:15:31 2017 -0700 -- .../streams/kstream/PrintForeachAction.java | 59 ++ .../kstream/internals/KStreamForeach.java | 43 .../streams/kstream/internals/KStreamImpl.java | 9 +- .../streams/kstream/internals/KStreamPeek.java | 9 +- .../streams/kstream/internals/KStreamPrint.java | 89 .../streams/kstream/internals/KTableImpl.java | 9 +- .../kstream/internals/KeyValuePrinter.java | 120 --- .../kstream/internals/KStreamPeekTest.java | 9 +- .../kstream/internals/KStreamPrintTest.java | 91 .../internals/KeyValuePrinterProcessorTest.java | 207 --- 10 files changed, 262 insertions(+), 383 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java new file mode 100644 index 000..3eb6d80 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.io.PrintWriter; + +public class PrintForeachAction implements ForeachAction { + +private final String streamName; +private final PrintWriter printWriter; + +/** + * Print data message with given writer. The PrintWriter can be null in order to + * distinguish between {@code System.out} and the others. If the PrintWriter is {@code PrintWriter(System.out)}, + * then it would close {@code System.out} output stream. + * + * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead. + * + * @param printWriter Use {@code System.out.println} if {@code null}. + * @param streamName The given name will be printed. + */ +public PrintForeachAction(final PrintWriter printWriter, final String streamName) { +this.printWriter = printWriter; +this.streamName = streamName; +} + +@Override +public void apply(final K key, final V value) { +final String data = String.format("[%s]: %s, %s", streamName, key, value); +if (printWriter == null) { +System.out.println(data); +} else { +printWriter.println(data); +} +} + +public void close() { +if (printWriter == null) { +System.out.flush(); +} else { +printWriter.close(); +} +} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
kafka git commit: KAFKA-5182: Reduce rebalance timeouts in request quota test
Repository: kafka Updated Branches: refs/heads/trunk c36b5b7f6 -> 8c7e66313 KAFKA-5182: Reduce rebalance timeouts in request quota test Reduce rebalance and session timeouts for join requests to trigger throttling in the request quota test. Author: Rajini SivaramReviewers: Ismael Juma , Damian Guy Closes #3057 from rajinisivaram/KAFKA-5182-quotatest Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c7e6631 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c7e6631 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c7e6631 Branch: refs/heads/trunk Commit: 8c7e6631308ae986bd60df0b0761f68c777fadff Parents: c36b5b7 Author: Rajini Sivaram Authored: Wed May 17 09:41:19 2017 -0400 Committer: Rajini Sivaram Committed: Wed May 17 09:41:19 2017 -0400 -- .../unit/kafka/server/RequestQuotaTest.scala| 78 +--- 1 file changed, 33 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/8c7e6631/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 4cf3e7d..1c496cd 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -57,13 +57,14 @@ class RequestQuotaTest extends BaseRequestTest { properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") +properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") +properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName) properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName) } @Before override def setUp() { - RequestQuotaTest.principal = KafkaPrincipal.ANONYMOUS super.setUp() @@ -86,54 +87,41 @@ class RequestQuotaTest extends BaseRequestTest { @After override def tearDown() { -try { - executor.shutdownNow() -} finally { - super.tearDown() -} +try executor.shutdownNow() +finally super.tearDown() } @Test def testResponseThrottleTime() { +for (apiKey <- RequestQuotaTest.ClientActions) + submitTest(apiKey, () => checkRequestThrottleTime(apiKey)) -for (apiKey <- RequestQuotaTest.ClientActions) { - val builder = requestBuilder(apiKey) - submitTest(apiKey, () => { -checkRequestThrottleTime(apiKey) - }) -} waitAndCheckResults() } @Test def testUnthrottledClient() { +for (apiKey <- RequestQuotaTest.ClientActions) + submitTest(apiKey, () => checkUnthrottledClient(apiKey)) -for (apiKey <- RequestQuotaTest.ClientActions) { - val builder = requestBuilder(apiKey) - submitTest(apiKey, () => { -checkUnthrottledClient(apiKey) - }) -} waitAndCheckResults() } @Test def testExemptRequestTime() { - -for (apiKey <- RequestQuotaTest.ClusterActions) { +for (apiKey <- RequestQuotaTest.ClusterActions) submitTest(apiKey, () => checkExemptRequestMetric(apiKey)) -} + waitAndCheckResults() } @Test def testUnauthorizedThrottle() { - RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal -for (apiKey <- ApiKeys.values) { +for (apiKey <- ApiKeys.values) submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) -} + waitAndCheckResults() } @@ -171,19 +159,19 @@ class RequestQuotaTest extends BaseRequestTest { private def requestBuilder(apiKey: ApiKeys): AbstractRequest.Builder[_ <: AbstractRequest] = { apiKey match { case ApiKeys.PRODUCE => - new requests.ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000, + new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 1, 5000, collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("test".getBytes))).asJava) case ApiKeys.FETCH => - val partitionMap = new LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] - partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 0, 100)) - requests.FetchRequest.Builder.forConsumer(0, 0, partitionMap) + val partitionMap = new
kafka git commit: MINOR: Make some constructors in admin package public
Repository: kafka Updated Branches: refs/heads/trunk 62d5aac5d -> c36b5b7f6 MINOR: Make some constructors in admin package public Add a public create API that takes a Properties instance. Make the constructors for TopicDescription, TopicListing and TopicPartitionInfo public to enable AdminClient users to write better tests. Author: Colin P. MccabeReviewers: Ismael Juma Closes #3070 from cmccabe/publicapi Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36b5b7f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36b5b7f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36b5b7f Branch: refs/heads/trunk Commit: c36b5b7f6ef5767c9455cca093ce05bf2a54d5db Parents: 62d5aac Author: Colin P. Mccabe Authored: Wed May 17 12:41:21 2017 +0100 Committer: Ismael Juma Committed: Wed May 17 12:41:28 2017 +0100 -- .../org/apache/kafka/clients/admin/AdminClient.java| 13 - .../apache/kafka/clients/admin/KafkaAdminClient.java | 2 +- .../apache/kafka/clients/admin/TopicDescription.java | 2 +- .../org/apache/kafka/clients/admin/TopicListing.java | 2 +- .../apache/kafka/clients/admin/TopicPartitionInfo.java | 2 +- 5 files changed, 16 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 7db5e6e..a976ca4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; import java.util.Map; +import java.util.Properties; /** * The public interface for the {@link KafkaAdminClient}, which supports managing and inspecting topics, @@ -34,11 +35,21 @@ public abstract class AdminClient implements AutoCloseable { /** * Create a new AdminClient with the given configuration. * + * @param props The configuration. + * @return The new KafkaAdminClient. + */ +public static AdminClient create(Properties props) { +return KafkaAdminClient.createInternal(new AdminClientConfig(props)); +} + +/** + * Create a new AdminClient with the given configuration. + * * @param conf The configuration. * @return The new KafkaAdminClient. */ public static AdminClient create(Map conf) { -return KafkaAdminClient.create(new AdminClientConfig(conf)); +return KafkaAdminClient.createInternal(new AdminClientConfig(conf)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ec10232..7dde027 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -238,7 +238,7 @@ public class KafkaAdminClient extends AdminClient { return throwable.getClass().getSimpleName(); } -static KafkaAdminClient create(AdminClientConfig config) { +static KafkaAdminClient createInternal(AdminClientConfig config) { Metadata metadata = null; Metrics metrics = null; NetworkClient networkClient = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/c36b5b7f/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index 2fc4442..f13dfff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -29,7 +29,7 @@ public class TopicDescription { private final boolean internal; private final NavigableMap partitions; -TopicDescription(String name, boolean internal, +public TopicDescription(String name, boolean internal,
kafka git commit: MINOR: Use new-consumer config in MirrorMaker doc
Repository: kafka Updated Branches: refs/heads/trunk 1cea4d8f5 -> 62d5aac5d MINOR: Use new-consumer config in MirrorMaker doc Mirrormaker was updated to default to the new consumer in 3db752a565071c78e4b11eaafa739844fa785b04 Old consumer calls the param `auto.commit.enable`, new consumer calls it `enable.auto.commit`. So I updated the docstring to use the new consumer name for the param. Author: Jeff WidmanReviewers: Ismael Juma Closes #2393 from jeffwidman/patch-1 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62d5aac5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62d5aac5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62d5aac5 Branch: refs/heads/trunk Commit: 62d5aac5dda5ad7108aeb41ac3006221e2dab74b Parents: 1cea4d8 Author: Jeff Widman Authored: Wed May 17 12:36:38 2017 +0100 Committer: Ismael Juma Committed: Wed May 17 12:36:38 2017 +0100 -- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/62d5aac5/core/src/main/scala/kafka/tools/MirrorMaker.scala -- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b3a7978..d61b355 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -59,7 +59,7 @@ import org.apache.kafka.common.record.RecordBatch *max.block.ms=max long *max.in.flight.requests.per.connection=1 * 2. Consumer Settings - *auto.commit.enable=false + *enable.auto.commit=false * 3. Mirror Maker Setting: *abort.on.send.failure=true */
kafka git commit: KAFKA-4714; Flatten and Cast single message transforms (KIP-66)
Repository: kafka Updated Branches: refs/heads/trunk ebc7f7caa -> 1cea4d8f5 KAFKA-4714; Flatten and Cast single message transforms (KIP-66) Author: Ewen Cheslack-PostavaReviewers: Konstantine Karantasis , Shikhar Bhushan , Jason Gustafson Closes #2458 from ewencp/kafka-3209-even-more-transforms Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cea4d8f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cea4d8f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cea4d8f Branch: refs/heads/trunk Commit: 1cea4d8f5a51cc5795ddd3af2ea015b9e14d937d Parents: ebc7f7c Author: Ewen Cheslack-Postava Authored: Tue May 16 23:05:35 2017 -0700 Committer: Jason Gustafson Committed: Tue May 16 23:05:35 2017 -0700 -- .../kafka/connect/data/SchemaBuilder.java | 2 +- .../kafka/connect/tools/TransformationDoc.java | 6 +- .../apache/kafka/connect/transforms/Cast.java | 417 +++ .../kafka/connect/transforms/Flatten.java | 281 + .../connect/transforms/util/Requirements.java | 1 + .../connect/transforms/util/SchemaUtil.java | 4 + .../kafka/connect/transforms/CastTest.java | 384 + .../kafka/connect/transforms/FlattenTest.java | 257 8 files changed, 1350 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java -- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java index f0c4586..058660e 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -75,7 +75,7 @@ public class SchemaBuilder implements Schema { // Additional parameters for logical types. private Map parameters; -private SchemaBuilder(Type type) { +public SchemaBuilder(Type type) { this.type = type; if (type == Type.STRUCT) { fields = new LinkedHashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 5616613..1a8f0a8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -17,7 +17,9 @@ package org.apache.kafka.connect.tools; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.transforms.Cast; import org.apache.kafka.connect.transforms.ExtractField; +import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.HoistField; import org.apache.kafka.connect.transforms.InsertField; import org.apache.kafka.connect.transforms.MaskField; @@ -54,7 +56,9 @@ public class TransformationDoc { new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF), new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF), new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF), -new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF) +new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF), +new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, Flatten.CONFIG_DEF), +new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF) ); private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1cea4d8f/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java -- diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java new file mode 100644 index 000..17be48c --- /dev/null +++