[3/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon
KAFKA-5776; Add the Trogdor fault injection daemon Author: Colin P. MccabeReviewers: Ismael Juma , Rajini Sivaram Closes #3699 from cmccabe/trogdor-review Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0772fde5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0772fde5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0772fde5 Branch: refs/heads/trunk Commit: 0772fde562ed361beec6ad745895a0ee0cf229e8 Parents: 607c3c2 Author: Colin P. Mccabe Authored: Fri Aug 25 12:29:40 2017 -0700 Committer: Rajini Sivaram Committed: Fri Aug 25 12:29:40 2017 -0700 -- bin/trogdor.sh | 50 +++ build.gradle| 7 + checkstyle/import-control.xml | 12 + checkstyle/suppressions.xml | 14 +- .../org/apache/kafka/common/utils/Utils.java| 38 +++ .../org/apache/kafka/trogdor/agent/Agent.java | 339 ++ .../apache/kafka/trogdor/agent/AgentClient.java | 158 + .../kafka/trogdor/agent/AgentRestResource.java | 80 + .../apache/kafka/trogdor/basic/BasicNode.java | 111 ++ .../kafka/trogdor/basic/BasicPlatform.java | 95 ++ .../kafka/trogdor/basic/BasicTopology.java | 58 .../apache/kafka/trogdor/common/JsonUtil.java | 52 +++ .../org/apache/kafka/trogdor/common/Node.java | 65 .../apache/kafka/trogdor/common/Platform.java | 80 + .../apache/kafka/trogdor/common/Topology.java | 35 ++ .../kafka/trogdor/coordinator/Coordinator.java | 341 +++ .../trogdor/coordinator/CoordinatorClient.java | 154 + .../coordinator/CoordinatorRestResource.java| 80 + .../kafka/trogdor/coordinator/NodeManager.java | 262 ++ .../kafka/trogdor/fault/AbstractFaultSpec.java | 53 +++ .../org/apache/kafka/trogdor/fault/Fault.java | 50 +++ .../apache/kafka/trogdor/fault/FaultSet.java| 146 .../apache/kafka/trogdor/fault/FaultSpec.java | 59 .../apache/kafka/trogdor/fault/FaultState.java | 27 ++ .../trogdor/fault/NetworkPartitionFault.java| 130 +++ .../fault/NetworkPartitionFaultSpec.java| 65 .../apache/kafka/trogdor/fault/NoOpFault.java | 92 + .../kafka/trogdor/fault/NoOpFaultSpec.java | 50 +++ .../kafka/trogdor/rest/AgentFaultsResponse.java | 52 +++ .../kafka/trogdor/rest/AgentStatusResponse.java | 59 .../trogdor/rest/CoordinatorFaultsResponse.java | 52 +++ .../trogdor/rest/CoordinatorStatusResponse.java | 59 .../trogdor/rest/CreateAgentFaultRequest.java | 69 .../rest/CreateCoordinatorFaultRequest.java | 69 .../org/apache/kafka/trogdor/rest/Empty.java| 49 +++ .../kafka/trogdor/rest/ErrorResponse.java | 68 .../apache/kafka/trogdor/rest/FaultDataMap.java | 98 ++ .../kafka/trogdor/rest/JsonRestServer.java | 220 .../kafka/trogdor/rest/RestExceptionMapper.java | 71 .../apache/kafka/trogdor/agent/AgentTest.java | 167 + .../kafka/trogdor/basic/BasicPlatformTest.java | 68 .../trogdor/common/CapturingCommandRunner.java | 60 .../kafka/trogdor/common/ExpectedFaults.java| 193 +++ .../trogdor/common/MiniTrogdorCluster.java | 238 + .../trogdor/coordinator/CoordinatorTest.java| 212 .../kafka/trogdor/fault/FaultSetTest.java | 126 +++ tools/src/test/resources/log4j.properties | 22 ++ 47 files changed, 4648 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/bin/trogdor.sh -- diff --git a/bin/trogdor.sh b/bin/trogdor.sh new file mode 100755 index 000..b211209 --- /dev/null +++ b/bin/trogdor.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +usage() { +cat
[2/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon
http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java -- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java new file mode 100644 index 000..9f1a19a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; + +import java.util.Set; + +public interface Fault { +/** + * Get the ID of this fault. + */ +String id(); + +/** + * Get the specification for this Fault. + */ +FaultSpec spec(); + +/** + * Activate the fault. + */ +void activate(Platform platform) throws Exception; + +/** + * Deactivate the fault. + */ +void deactivate(Platform platform) throws Exception; + +/** + * Get the nodes which this fault is targetting. + */ +Set targetNodes(Topology topology); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java -- diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java new file mode 100644 index 000..63e5ff4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +public class FaultSet { +private final static long NS_PER_MS = 100L; + +/** + * Maps fault start times in nanoseconds to faults. + */ +private final TreeMapbyStart = new TreeMap (); + +/** + * Maps fault end times in nanoseconds to faults. + */ +private final TreeMap byEnd = new TreeMap (); + +/** + * Return an iterator that iterates over the fault set in start time order. + */ +public FaultSetIterator iterateByStart() { +return new FaultSetIterator(byStart); +} + +/** + * Return an iterator that iterates over the fault set in end time order. + */ +public FaultSetIterator iterateByEnd() { +return new FaultSetIterator(byEnd); +} + +/** + * Add a new fault to the FaultSet. + */ +public void add(Fault fault) { +insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault); +long endMs = fault.spec().startMs() + fault.spec().durationMs(); +insertUnique(byEnd, endMs * NS_PER_MS, fault); +} + +/** + * Insert a new fault to a TreeMap. + * + * If there is already a fault with the given key, the fault will be stored + * with the next available key. + */ +private void insertUnique(TreeMap map, long key, Fault fault) { +while (true) { +Fault existing = map.get(key); +if (existing == null) { +map.put(key, fault); +return; +} else
[1/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon
Repository: kafka Updated Branches: refs/heads/trunk 607c3c21f -> 0772fde56 http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java -- diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java new file mode 100644 index 000..1947b79 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.agent.Agent; +import org.apache.kafka.trogdor.agent.AgentClient; +import org.apache.kafka.trogdor.agent.AgentRestResource; +import org.apache.kafka.trogdor.basic.BasicNode; +import org.apache.kafka.trogdor.basic.BasicPlatform; +import org.apache.kafka.trogdor.basic.BasicTopology; +import org.apache.kafka.trogdor.coordinator.Coordinator; + +import org.apache.kafka.trogdor.coordinator.CoordinatorClient; +import org.apache.kafka.trogdor.coordinator.CoordinatorRestResource; +import org.apache.kafka.trogdor.rest.JsonRestServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * MiniTrogdorCluster sets up a local cluster of Trogdor Agents and Coordinators. + */ +public class MiniTrogdorCluster implements AutoCloseable { +private static final Logger log = LoggerFactory.getLogger(MiniTrogdorCluster.class); + +/** + * The MiniTrogdorCluster#Builder is used to set up a new MiniTrogdorCluster. + */ +public static class Builder { +private final TreeSet agentNames = new TreeSet<>(); + +private String coordinatorName = null; + +private Time time = Time.SYSTEM; + +private BasicPlatform.CommandRunner commandRunner = +new BasicPlatform.ShellCommandRunner(); + +private static class NodeData { +String hostname; +AgentRestResource agentRestResource = null; +JsonRestServer agentRestServer = null; +int agentPort = 0; + +JsonRestServer coordinatorRestServer = null; +int coordinatorPort = 0; +CoordinatorRestResource coordinatorRestResource = null; + +Platform platform = null; +Agent agent = null; +Coordinator coordinator = null; + +BasicNode node = null; +} + +public Builder() { +} + +/** + * Set the timekeeper used by this MiniTrogdorCluster. + */ +public Builder time(Time time) { +this.time = time; +return this; +} + +public Builder commandRunner(BasicPlatform.CommandRunner commandRunner) { +this.commandRunner = commandRunner; +return this; +} + +/** + * Add a new trogdor coordinator node to the cluster. + */ +public Builder addCoordinator(String nodeName) { +if (coordinatorName != null) { +throw new RuntimeException("At most one coordinator is allowed."); +} +coordinatorName = nodeName; +return this; +} + +/** + * Add a new trogdor agent node to the cluster. + */ +public Builder addAgent(String nodeName) { +if (agentNames.contains(nodeName)) { +throw new RuntimeException("There is already an agent on node " + nodeName); +} +agentNames.add(nodeName); +return this; +} + +private NodeData getOrCreate(String nodeName, TreeMapnodes) { +NodeData data = nodes.get(nodeName); +if (data != null) +return data; +data = new NodeData(); +data.hostname = "127.0.0.1"; +
kafka git commit: KAFKA-5755; KafkaProducer should be refactored to use LogContext
Repository: kafka Updated Branches: refs/heads/trunk c4d629a0b -> 607c3c21f KAFKA-5755; KafkaProducer should be refactored to use LogContext With LogContext, each producer log item is automatically prefixed with client id and transactional id. Author: huxihxReviewers: Jason Gustafson Closes #3703 from huxihx/KAFKA-5755 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/607c3c21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/607c3c21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/607c3c21 Branch: refs/heads/trunk Commit: 607c3c21f652a8f911d1efb8374d2eec313a4e2d Parents: c4d629a Author: huxihx Authored: Fri Aug 25 10:38:15 2017 -0700 Committer: Jason Gustafson Committed: Fri Aug 25 10:42:40 2017 -0700 -- checkstyle/suppressions.xml | 2 + .../kafka/clients/producer/KafkaProducer.java | 38 + .../producer/internals/RecordAccumulator.java | 10 ++- .../clients/producer/internals/Sender.java | 8 +- .../internals/RecordAccumulatorTest.java| 86 .../clients/producer/internals/SenderTest.java | 20 +++-- .../internals/TransactionManagerTest.java | 6 +- 7 files changed, 103 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/checkstyle/suppressions.xml -- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7f3c4b6..e4c244f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -35,6 +35,8 @@ files="ConfigDef.java"/> + http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ce2efba..ba0d848 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -58,9 +58,9 @@ import org.apache.kafka.common.serialization.ExtendedSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Collections; @@ -224,7 +224,7 @@ import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e */ public class KafkaProducer implements Producer { -private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); +private final Logger log; private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; @@ -305,13 +305,19 @@ public class KafkaProducer implements Producer { @SuppressWarnings("unchecked") private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { try { -log.trace("Starting the Kafka producer"); Map userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = Time.SYSTEM; clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + +String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? +(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; +LogContext logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s", clientId, transactionalId)); +log = logContext.logger(KafkaProducer.class); +log.trace("Starting the Kafka producer"); + Map metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) @@ -354,13 +360,14 @@
[2/3] kafka git commit: MINOR: Consolidate broker request/response handling
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/RequestChannel.scala -- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index ec8e1eb..1d95fa0 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -19,19 +19,16 @@ package kafka.network import java.net.InetAddress import java.nio.ByteBuffer -import java.util.Collections import java.util.concurrent._ import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup +import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest} import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol} -import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time @@ -40,25 +37,20 @@ import org.apache.log4j.Logger import scala.reflect.ClassTag object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), -startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol = SecurityProtocol.PLAINTEXT, -MemoryPool.NONE, shutdownReceive) private val requestLogger = Logger.getLogger("kafka.request.logger") - private def shutdownReceive: ByteBuffer = { -val emptyProduceRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 0, 0, - Collections.emptyMap[TopicPartition, MemoryRecords]).build() -val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, emptyProduceRequest.version, "", 0) -emptyProduceRequest.serialize(emptyRequestHeader) - } + sealed trait BaseRequest + case object ShutdownRequest extends BaseRequest case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { val sanitizedUser = QuotaId.sanitize(principal.getName) } - class Request(val processor: Int, val connectionId: String, val session: Session, startTimeNanos: Long, -val listenerName: ListenerName, val securityProtocol: SecurityProtocol, memoryPool: MemoryPool, -@volatile private var buffer: ByteBuffer) { + class Request(val processor: Int, +val context: RequestContext, +val startTimeNanos: Long, +memoryPool: MemoryPool, +@volatile private var buffer: ByteBuffer) extends BaseRequest { // These need to be volatile because the readers are in the network thread and the writers are in the request // handler threads or the purgatory threads @volatile var requestDequeueTimeNanos = -1L @@ -68,31 +60,17 @@ object RequestChannel extends Logging { @volatile var apiRemoteCompleteTimeNanos = -1L @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None -val header: RequestHeader = try { - RequestHeader.parse(buffer) -} catch { - case ex: Throwable => -throw new InvalidRequestException(s"Error parsing request header. Our best guess of the apiKey is: ${buffer.getShort(0)}", ex) -} +val session = Session(context.principal, context.clientAddress) +private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) -val bodyAndSize: RequestAndSize = - try { -// For unsupported version of ApiVersionsRequest, create a dummy request to enable an error response to be returned later -if (header.apiKey == ApiKeys.API_VERSIONS.id && !Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) { - new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0) -} -else - AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer) - } catch { -case ex: Throwable => - throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex) - } +def header: RequestHeader = context.header +def sizeOfBodyInBytes: Int = bodyAndSize.size //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. -if
[3/3] kafka git commit: MINOR: Consolidate broker request/response handling
MINOR: Consolidate broker request/response handling This patch contains a few small improvements to make request/response handling more consistent. Primarily it consolidates request/response serialization logic so that `SaslServerAuthenticator` and `KafkaApis` follow the same path. It also reduces the amount of custom logic needed to handle unsupported versions of the ApiVersions requests. Author: Jason GustafsonReviewers: Ismael Juma Closes #3673 from hachikuji/consolidate-response-handling Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4d629a0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4d629a0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4d629a0 Branch: refs/heads/trunk Commit: c4d629a0b3cbd11c174cb8b09a50bc8de77825e9 Parents: 05e3850 Author: Jason Gustafson Authored: Fri Aug 25 10:23:11 2017 -0700 Committer: Jason Gustafson Committed: Fri Aug 25 10:23:11 2017 -0700 -- .../org/apache/kafka/clients/ClientRequest.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 24 +- .../internals/ConsumerNetworkClient.java| 4 +- .../clients/producer/internals/Sender.java | 5 +- .../kafka/common/network/ChannelBuilders.java | 2 +- .../common/network/SaslChannelBuilder.java | 17 +- .../org/apache/kafka/common/network/Send.java | 4 +- .../kafka/common/requests/AbstractRequest.java | 112 +++-- .../kafka/common/requests/AbstractResponse.java | 19 +- .../common/requests/ApiVersionsRequest.java | 27 ++- .../common/requests/ApiVersionsResponse.java| 12 +- .../kafka/common/requests/FetchResponse.java| 21 +- .../kafka/common/requests/RequestContext.java | 92 .../kafka/common/requests/RequestHeader.java| 48 ++-- .../authenticator/SaslClientAuthenticator.java | 8 +- .../authenticator/SaslServerAuthenticator.java | 103 + .../common/network/SaslChannelBuilderTest.java | 4 +- .../common/requests/RequestContextTest.java | 75 ++ .../common/requests/RequestHeaderTest.java | 6 +- .../common/requests/RequestResponseTest.java| 12 +- .../authenticator/SaslAuthenticatorTest.java| 14 +- .../SaslServerAuthenticatorTest.java| 8 +- .../controller/ControllerChannelManager.scala | 2 +- ...nsactionMarkerRequestCompletionHandler.scala | 4 +- .../scala/kafka/network/RequestChannel.scala| 111 - .../main/scala/kafka/network/SocketServer.scala | 24 +- .../server/ClientRequestQuotaManager.scala | 25 +- .../src/main/scala/kafka/server/KafkaApis.scala | 229 +-- .../kafka/server/KafkaRequestHandler.scala | 59 ++--- core/src/main/scala/kafka/utils/Logging.scala | 2 + .../scala/other/kafka/TestOffsetManager.scala | 3 +- .../unit/kafka/admin/AdminRackAwareTest.scala | 5 +- .../admin/ResetConsumerGroupOffsetTest.scala| 9 +- .../TransactionMarkerChannelManagerTest.scala | 11 +- ...tionMarkerRequestCompletionHandlerTest.scala | 30 ++- .../integration/KafkaServerTestHarness.scala| 2 +- .../kafka/integration/PrimitiveApiTest.scala| 3 +- .../kafka/integration/TopicMetadataTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala| 1 - .../kafka/log/ProducerStateManagerTest.scala| 2 +- .../unit/kafka/network/SocketServerTest.scala | 64 +++--- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../unit/kafka/server/FetchRequestTest.scala| 4 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 21 +- .../scala/unit/kafka/server/LogOffsetTest.scala | 2 +- .../unit/kafka/server/LogRecoveryTest.scala | 4 +- .../unit/kafka/server/OffsetCommitTest.scala| 2 +- .../unit/kafka/server/ReplicaFetchTest.scala| 3 - .../kafka/server/ReplicationQuotasTest.scala| 2 +- .../unit/kafka/server/RequestQuotaTest.scala| 4 +- .../unit/kafka/utils/IteratorTemplateTest.scala | 2 +- .../unit/kafka/utils/timer/TimerTest.scala | 3 +- 52 files changed, 685 insertions(+), 571 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 964..9b62946 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -79,7 +79,7 @@ public final class ClientRequest { } public RequestHeader makeHeader(short version) { -return
[1/3] kafka git commit: MINOR: Consolidate broker request/response handling
Repository: kafka Updated Branches: refs/heads/trunk 05e3850b2 -> c4d629a0b http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala -- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 4b33dcb..aadb4d2 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -298,7 +298,7 @@ class RequestQuotaTest extends BaseRequestTest { private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = { val apiKey = requestBuilder.apiKey val request = requestBuilder.build() -val header = new RequestHeader(apiKey.id, request.version, clientId, correlationId) +val header = new RequestHeader(apiKey, request.version, clientId, correlationId) val response = requestAndReceive(socket, request.serialize(header).array) val responseBuffer = skipResponseHeader(response) apiKey.parseResponse(request.version, responseBuffer) @@ -326,7 +326,7 @@ class RequestQuotaTest extends BaseRequestTest { override def toString: String = { val requestTime = requestTimeMetricValue(clientId) val throttleTime = throttleTimeMetricValue(clientId) - s"Client $clientId apiKey ${apiKey} requests $correlationId requestTime $requestTime throttleTime $throttleTime" + s"Client $clientId apiKey $apiKey requests $correlationId requestTime $requestTime throttleTime $throttleTime" } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala -- diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala index 6a40510..67e64d5 100644 --- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -18,7 +18,7 @@ package kafka.utils import org.junit.Assert._ import org.scalatest.Assertions -import org.junit.{Test, After, Before} +import org.junit.Test class IteratorTemplateTest extends Assertions { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala -- diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala index 54b73b8..e25a5cb 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala @@ -16,14 +16,13 @@ */ package kafka.utils.timer -import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeUnit} import org.junit.Assert._ import java.util.concurrent.atomic._ import org.junit.{Test, After, Before} import scala.collection.mutable.ArrayBuffer -import scala.util.Random class TimerTest {
kafka git commit: KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order
Repository: kafka Updated Branches: refs/heads/0.11.0 6e5f8850d -> 142575891 KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order Suggested fix for the bug Author: radzishReviewers: Damian Guy Closes #3737 from radzish/KAFKA-5771 (cherry picked from commit 05e3850b2ea24f372ea1cc6ca7457e0ff99445f6) Signed-off-by: Damian Guy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14257589 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14257589 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14257589 Branch: refs/heads/0.11.0 Commit: 1425758915adbc858d5f5c5b3da9cabc10215aa6 Parents: 6e5f885 Author: radzish Authored: Fri Aug 25 13:59:10 2017 +0100 Committer: Damian Guy Committed: Fri Aug 25 13:59:25 2017 +0100 -- .../kafka/streams/state/internals/Segments.java | 4 +--- .../kafka/streams/state/internals/SegmentsTest.java | 15 +++ 2 files changed, 16 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/14257589/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 77b92a5..9c8653a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -85,9 +85,7 @@ class Segments { if (previousSegment == null) { newSegment.openDB(context); maxSegmentId = segmentId > maxSegmentId ? segmentId : maxSegmentId; -if (minSegmentId == Long.MAX_VALUE) { -minSegmentId = maxSegmentId; -} +minSegmentId = segmentId < minSegmentId ? segmentId : minSegmentId; } return previousSegment == null ? newSegment : previousSegment; } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/14257589/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index aedf74b..9d367eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -167,6 +167,21 @@ public class SegmentsTest { } @Test +public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception { +segments.getOrCreateSegment(4, context); +segments.getOrCreateSegment(2, context); +segments.getOrCreateSegment(0, context); +segments.getOrCreateSegment(1, context); +segments.getOrCreateSegment(3, context); + +final List segments = this.segments.segments(0, 2 * 60 * 1000); +assertEquals(3, segments.size()); +assertEquals(0, segments.get(0).id); +assertEquals(1, segments.get(1).id); +assertEquals(2, segments.get(2).id); +} + +@Test public void shouldRollSegments() throws Exception { segments.getOrCreateSegment(0, context); verifyCorrectSegments(0, 1);
kafka git commit: KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order
Repository: kafka Updated Branches: refs/heads/trunk 14f6ecd91 -> 05e3850b2 KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order Suggested fix for the bug Author: radzishReviewers: Damian Guy Closes #3737 from radzish/KAFKA-5771 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05e3850b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05e3850b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05e3850b Branch: refs/heads/trunk Commit: 05e3850b2ea24f372ea1cc6ca7457e0ff99445f6 Parents: 14f6ecd Author: radzish Authored: Fri Aug 25 13:59:10 2017 +0100 Committer: Damian Guy Committed: Fri Aug 25 13:59:10 2017 +0100 -- .../kafka/streams/state/internals/Segments.java | 4 +--- .../kafka/streams/state/internals/SegmentsTest.java | 15 +++ 2 files changed, 16 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/05e3850b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java -- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 77b92a5..9c8653a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -85,9 +85,7 @@ class Segments { if (previousSegment == null) { newSegment.openDB(context); maxSegmentId = segmentId > maxSegmentId ? segmentId : maxSegmentId; -if (minSegmentId == Long.MAX_VALUE) { -minSegmentId = maxSegmentId; -} +minSegmentId = segmentId < minSegmentId ? segmentId : minSegmentId; } return previousSegment == null ? newSegment : previousSegment; } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/05e3850b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java -- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index aedf74b..9d367eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -167,6 +167,21 @@ public class SegmentsTest { } @Test +public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception { +segments.getOrCreateSegment(4, context); +segments.getOrCreateSegment(2, context); +segments.getOrCreateSegment(0, context); +segments.getOrCreateSegment(1, context); +segments.getOrCreateSegment(3, context); + +final List segments = this.segments.segments(0, 2 * 60 * 1000); +assertEquals(3, segments.size()); +assertEquals(0, segments.get(0).id); +assertEquals(1, segments.get(1).id); +assertEquals(2, segments.get(2).id); +} + +@Test public void shouldRollSegments() throws Exception { segments.getOrCreateSegment(0, context); verifyCorrectSegments(0, 1);
kafka git commit: KAFKA-4869; Update 0.10.2.0 upgrade notes
Repository: kafka Updated Branches: refs/heads/0.10.2 ff50cf9ea -> a00823805 KAFKA-4869; Update 0.10.2.0 upgrade notes Author: Manikumar ReddyReviewers: Ismael Juma Closes #3735 from omkreddy/cleanup10 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0082380 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0082380 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0082380 Branch: refs/heads/0.10.2 Commit: a008238051b3dbb1a85c0c04b31486cd7fac Parents: ff50cf9 Author: Manikumar Reddy Authored: Fri Aug 25 11:22:16 2017 +0100 Committer: Ismael Juma Committed: Fri Aug 25 11:22:16 2017 +0100 -- docs/upgrade.html | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a0082380/docs/upgrade.html -- diff --git a/docs/upgrade.html b/docs/upgrade.html index 0221f63..d7581fa 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -87,7 +87,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8. should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics. Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to 3.5 Kafka Streams Configs. -The offsets.topic.replication.factor broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement. New Protocol Versions
kafka git commit: MINOR: KafkaService should print node hostname on failure
Repository: kafka Updated Branches: refs/heads/trunk e6164ec61 -> 14f6ecd91 MINOR: KafkaService should print node hostname on failure Author: Colin P. MccabeReviewers: Apurva Mehta , Ismael Juma Closes #3715 from cmccabe/kafka_service_print_node_hostname_on_failure Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14f6ecd9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14f6ecd9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14f6ecd9 Branch: refs/heads/trunk Commit: 14f6ecd9151fa985fad0a30347efab1397af2491 Parents: e6164ec Author: Colin P. Mccabe Authored: Fri Aug 25 07:44:31 2017 +0100 Committer: Ismael Juma Committed: Fri Aug 25 07:44:42 2017 +0100 -- tests/kafkatest/services/kafka/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/14f6ecd9/tests/kafkatest/services/kafka/kafka.py -- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 743f63a..8c21d68 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -239,7 +239,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.start_jmx_tool(self.idx(node), node) if len(self.pids(node)) == 0: -raise Exception("No process ids recorded on node %s" % str(node)) +raise Exception("No process ids recorded on node %s" % node.account.hostname) def pids(self, node): """Return process ids associated with running processes on the given node."""
kafka-site git commit: KAFKA-4869: Update 0.10.2.0 upgrade notes
Repository: kafka-site Updated Branches: refs/heads/asf-site a1b78e7a3 -> aad5bb53c KAFKA-4869: Update 0.10.2.0 upgrade notes Author: Manikumar ReddyReviewers: Ismael Juma Closes #71 from omkreddy/cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/aad5bb53 Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/aad5bb53 Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/aad5bb53 Branch: refs/heads/asf-site Commit: aad5bb53c92221ed2e6336ccb344a389d57f0208 Parents: a1b78e7 Author: Manikumar Reddy Authored: Thu Aug 24 17:16:31 2017 +0530 Committer: Ismael Juma Committed: Fri Aug 25 07:41:42 2017 +0100 -- 0102/upgrade.html | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kafka-site/blob/aad5bb53/0102/upgrade.html -- diff --git a/0102/upgrade.html b/0102/upgrade.html index 0221f63..d7581fa 100644 --- a/0102/upgrade.html +++ b/0102/upgrade.html @@ -87,7 +87,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8. should not be set in the Streams app any more. If the Kafka cluster is secured, Streams apps must have the required security privileges to create new topics. Several new fields including "security.protocol", "connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to StreamsConfig class. User should pay attention to the default values and set these if needed. For more details please refer to 3.5 Kafka Streams Configs. -The offsets.topic.replication.factor broker config is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this replication factor requirement. New Protocol Versions