[3/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon

2017-08-25 Thread rsivaram
KAFKA-5776; Add the Trogdor fault injection daemon

Author: Colin P. Mccabe 

Reviewers: Ismael Juma , Rajini Sivaram 


Closes #3699 from cmccabe/trogdor-review


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0772fde5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0772fde5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0772fde5

Branch: refs/heads/trunk
Commit: 0772fde562ed361beec6ad745895a0ee0cf229e8
Parents: 607c3c2
Author: Colin P. Mccabe 
Authored: Fri Aug 25 12:29:40 2017 -0700
Committer: Rajini Sivaram 
Committed: Fri Aug 25 12:29:40 2017 -0700

--
 bin/trogdor.sh  |  50 +++
 build.gradle|   7 +
 checkstyle/import-control.xml   |  12 +
 checkstyle/suppressions.xml |  14 +-
 .../org/apache/kafka/common/utils/Utils.java|  38 +++
 .../org/apache/kafka/trogdor/agent/Agent.java   | 339 ++
 .../apache/kafka/trogdor/agent/AgentClient.java | 158 +
 .../kafka/trogdor/agent/AgentRestResource.java  |  80 +
 .../apache/kafka/trogdor/basic/BasicNode.java   | 111 ++
 .../kafka/trogdor/basic/BasicPlatform.java  |  95 ++
 .../kafka/trogdor/basic/BasicTopology.java  |  58 
 .../apache/kafka/trogdor/common/JsonUtil.java   |  52 +++
 .../org/apache/kafka/trogdor/common/Node.java   |  65 
 .../apache/kafka/trogdor/common/Platform.java   |  80 +
 .../apache/kafka/trogdor/common/Topology.java   |  35 ++
 .../kafka/trogdor/coordinator/Coordinator.java  | 341 +++
 .../trogdor/coordinator/CoordinatorClient.java  | 154 +
 .../coordinator/CoordinatorRestResource.java|  80 +
 .../kafka/trogdor/coordinator/NodeManager.java  | 262 ++
 .../kafka/trogdor/fault/AbstractFaultSpec.java  |  53 +++
 .../org/apache/kafka/trogdor/fault/Fault.java   |  50 +++
 .../apache/kafka/trogdor/fault/FaultSet.java| 146 
 .../apache/kafka/trogdor/fault/FaultSpec.java   |  59 
 .../apache/kafka/trogdor/fault/FaultState.java  |  27 ++
 .../trogdor/fault/NetworkPartitionFault.java| 130 +++
 .../fault/NetworkPartitionFaultSpec.java|  65 
 .../apache/kafka/trogdor/fault/NoOpFault.java   |  92 +
 .../kafka/trogdor/fault/NoOpFaultSpec.java  |  50 +++
 .../kafka/trogdor/rest/AgentFaultsResponse.java |  52 +++
 .../kafka/trogdor/rest/AgentStatusResponse.java |  59 
 .../trogdor/rest/CoordinatorFaultsResponse.java |  52 +++
 .../trogdor/rest/CoordinatorStatusResponse.java |  59 
 .../trogdor/rest/CreateAgentFaultRequest.java   |  69 
 .../rest/CreateCoordinatorFaultRequest.java |  69 
 .../org/apache/kafka/trogdor/rest/Empty.java|  49 +++
 .../kafka/trogdor/rest/ErrorResponse.java   |  68 
 .../apache/kafka/trogdor/rest/FaultDataMap.java |  98 ++
 .../kafka/trogdor/rest/JsonRestServer.java  | 220 
 .../kafka/trogdor/rest/RestExceptionMapper.java |  71 
 .../apache/kafka/trogdor/agent/AgentTest.java   | 167 +
 .../kafka/trogdor/basic/BasicPlatformTest.java  |  68 
 .../trogdor/common/CapturingCommandRunner.java  |  60 
 .../kafka/trogdor/common/ExpectedFaults.java| 193 +++
 .../trogdor/common/MiniTrogdorCluster.java  | 238 +
 .../trogdor/coordinator/CoordinatorTest.java| 212 
 .../kafka/trogdor/fault/FaultSetTest.java   | 126 +++
 tools/src/test/resources/log4j.properties   |  22 ++
 47 files changed, 4648 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/bin/trogdor.sh
--
diff --git a/bin/trogdor.sh b/bin/trogdor.sh
new file mode 100755
index 000..b211209
--- /dev/null
+++ b/bin/trogdor.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+usage() {
+cat 

[2/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon

2017-08-25 Thread rsivaram
http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
--
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
new file mode 100644
index 000..9f1a19a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+
+import java.util.Set;
+
+public interface Fault {
+/**
+ * Get the ID of this fault.
+ */
+String id();
+
+/**
+ * Get the specification for this Fault.
+ */
+FaultSpec spec();
+
+/**
+ * Activate the fault.
+ */
+void activate(Platform platform) throws Exception;
+
+/**
+ * Deactivate the fault.
+ */
+void deactivate(Platform platform) throws Exception;
+
+/**
+ * Get the nodes which this fault is targetting.
+ */
+Set targetNodes(Topology topology);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
--
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java 
b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
new file mode 100644
index 000..63e5ff4
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+public class FaultSet {
+private final static long NS_PER_MS = 100L;
+
+/**
+ * Maps fault start times in nanoseconds to faults.
+ */
+private final TreeMap byStart = new TreeMap();
+
+/**
+ * Maps fault end times in nanoseconds to faults.
+ */
+private final TreeMap byEnd = new TreeMap();
+
+/**
+ * Return an iterator that iterates over the fault set in start time order.
+ */
+public FaultSetIterator iterateByStart() {
+return new FaultSetIterator(byStart);
+}
+
+/**
+ * Return an iterator that iterates over the fault set in end time order.
+ */
+public FaultSetIterator iterateByEnd() {
+return new FaultSetIterator(byEnd);
+}
+
+/**
+ * Add a new fault to the FaultSet.
+ */
+public void add(Fault fault) {
+insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
+long endMs = fault.spec().startMs() + fault.spec().durationMs();
+insertUnique(byEnd, endMs * NS_PER_MS, fault);
+}
+
+/**
+ * Insert a new fault to a TreeMap.
+ *
+ * If there is already a fault with the given key, the fault will be stored
+ * with the next available key.
+ */
+private void insertUnique(TreeMap map, long key, Fault fault) 
{
+while (true) {
+Fault existing = map.get(key);
+if (existing == null) {
+map.put(key, fault);
+return;
+} else 

[1/3] kafka git commit: KAFKA-5776; Add the Trogdor fault injection daemon

2017-08-25 Thread rsivaram
Repository: kafka
Updated Branches:
  refs/heads/trunk 607c3c21f -> 0772fde56


http://git-wip-us.apache.org/repos/asf/kafka/blob/0772fde5/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
--
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
new file mode 100644
index 000..1947b79
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.agent.Agent;
+import org.apache.kafka.trogdor.agent.AgentClient;
+import org.apache.kafka.trogdor.agent.AgentRestResource;
+import org.apache.kafka.trogdor.basic.BasicNode;
+import org.apache.kafka.trogdor.basic.BasicPlatform;
+import org.apache.kafka.trogdor.basic.BasicTopology;
+import org.apache.kafka.trogdor.coordinator.Coordinator;
+
+import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
+import org.apache.kafka.trogdor.coordinator.CoordinatorRestResource;
+import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * MiniTrogdorCluster sets up a local cluster of Trogdor Agents and 
Coordinators.
+ */
+public class MiniTrogdorCluster implements AutoCloseable {
+private static final Logger log = 
LoggerFactory.getLogger(MiniTrogdorCluster.class);
+
+/**
+ * The MiniTrogdorCluster#Builder is used to set up a new 
MiniTrogdorCluster.
+ */
+public static class Builder {
+private final TreeSet agentNames = new TreeSet<>();
+
+private String coordinatorName = null;
+
+private Time time = Time.SYSTEM;
+
+private BasicPlatform.CommandRunner commandRunner =
+new BasicPlatform.ShellCommandRunner();
+
+private static class NodeData {
+String hostname;
+AgentRestResource agentRestResource = null;
+JsonRestServer agentRestServer = null;
+int agentPort = 0;
+
+JsonRestServer coordinatorRestServer = null;
+int coordinatorPort = 0;
+CoordinatorRestResource coordinatorRestResource = null;
+
+Platform platform = null;
+Agent agent = null;
+Coordinator coordinator = null;
+
+BasicNode node = null;
+}
+
+public Builder() {
+}
+
+/**
+ * Set the timekeeper used by this MiniTrogdorCluster.
+ */
+public Builder time(Time time) {
+this.time = time;
+return this;
+}
+
+public Builder commandRunner(BasicPlatform.CommandRunner 
commandRunner) {
+this.commandRunner = commandRunner;
+return this;
+}
+
+/**
+ * Add a new trogdor coordinator node to the cluster.
+ */
+public Builder addCoordinator(String nodeName) {
+if (coordinatorName != null) {
+throw new RuntimeException("At most one coordinator is 
allowed.");
+}
+coordinatorName = nodeName;
+return this;
+}
+
+/**
+ * Add a new trogdor agent node to the cluster.
+ */
+public Builder addAgent(String nodeName) {
+if (agentNames.contains(nodeName)) {
+throw new RuntimeException("There is already an agent on node 
" + nodeName);
+}
+agentNames.add(nodeName);
+return this;
+}
+
+private NodeData getOrCreate(String nodeName, TreeMap nodes) {
+NodeData data = nodes.get(nodeName);
+if (data != null)
+return data;
+data = new NodeData();
+data.hostname = "127.0.0.1";
+

kafka git commit: KAFKA-5755; KafkaProducer should be refactored to use LogContext

2017-08-25 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk c4d629a0b -> 607c3c21f


KAFKA-5755; KafkaProducer should be refactored to use LogContext

With LogContext, each producer log item is automatically prefixed with client 
id and transactional id.

Author: huxihx 

Reviewers: Jason Gustafson 

Closes #3703 from huxihx/KAFKA-5755


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/607c3c21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/607c3c21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/607c3c21

Branch: refs/heads/trunk
Commit: 607c3c21f652a8f911d1efb8374d2eec313a4e2d
Parents: c4d629a
Author: huxihx 
Authored: Fri Aug 25 10:38:15 2017 -0700
Committer: Jason Gustafson 
Committed: Fri Aug 25 10:42:40 2017 -0700

--
 checkstyle/suppressions.xml |  2 +
 .../kafka/clients/producer/KafkaProducer.java   | 38 +
 .../producer/internals/RecordAccumulator.java   | 10 ++-
 .../clients/producer/internals/Sender.java  |  8 +-
 .../internals/RecordAccumulatorTest.java| 86 
 .../clients/producer/internals/SenderTest.java  | 20 +++--
 .../internals/TransactionManagerTest.java   |  6 +-
 7 files changed, 103 insertions(+), 67 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/checkstyle/suppressions.xml
--
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7f3c4b6..e4c244f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -35,6 +35,8 @@
   files="ConfigDef.java"/>
 
+
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/607c3c21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ce2efba..ba0d848 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -58,9 +58,9 @@ import 
org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -224,7 +224,7 @@ import static 
org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.e
  */
 public class KafkaProducer implements Producer {
 
-private static final Logger log = 
LoggerFactory.getLogger(KafkaProducer.class);
+private final Logger log;
 private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
 private static final String JMX_PREFIX = "kafka.producer";
 public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
@@ -305,13 +305,19 @@ public class KafkaProducer implements Producer {
 @SuppressWarnings("unchecked")
 private KafkaProducer(ProducerConfig config, Serializer keySerializer, 
Serializer valueSerializer) {
 try {
-log.trace("Starting the Kafka producer");
 Map userProvidedConfigs = config.originals();
 this.producerConfig = config;
 this.time = Time.SYSTEM;
 clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
 if (clientId.length() <= 0)
 clientId = "producer-" + 
PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+String transactionalId = 
userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
+(String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
+LogContext logContext = new LogContext(String.format("[Producer 
clientId=%s, transactionalId=%s", clientId, transactionalId));
+log = logContext.logger(KafkaProducer.class);
+log.trace("Starting the Kafka producer");
+
 Map metricTags = 
Collections.singletonMap("client-id", clientId);
 MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
 
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
@@ -354,13 +360,14 @@ 

[2/3] kafka git commit: MINOR: Consolidate broker request/response handling

2017-08-25 Thread jgus
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/network/RequestChannel.scala
--
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index ec8e1eb..1d95fa0 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -19,19 +19,16 @@ package kafka.network
 
 import java.net.InetAddress
 import java.nio.ByteBuffer
-import java.util.Collections
 import java.util.concurrent._
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest}
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -40,25 +37,20 @@ import org.apache.log4j.Logger
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
-startTimeNanos = 0, listenerName = new ListenerName(""), securityProtocol 
= SecurityProtocol.PLAINTEXT,
-MemoryPool.NONE, shutdownReceive)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
-  private def shutdownReceive: ByteBuffer = {
-val emptyProduceRequest = new 
ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, 0, 0,
-  Collections.emptyMap[TopicPartition, MemoryRecords]).build()
-val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, 
emptyProduceRequest.version, "", 0)
-emptyProduceRequest.serialize(emptyRequestHeader)
-  }
+  sealed trait BaseRequest
+  case object ShutdownRequest extends BaseRequest
 
   case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
 val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  class Request(val processor: Int, val connectionId: String, val session: 
Session, startTimeNanos: Long,
-val listenerName: ListenerName, val securityProtocol: 
SecurityProtocol, memoryPool: MemoryPool,
-@volatile private var buffer: ByteBuffer) {
+  class Request(val processor: Int,
+val context: RequestContext,
+val startTimeNanos: Long,
+memoryPool: MemoryPool,
+@volatile private var buffer: ByteBuffer) extends BaseRequest {
 // These need to be volatile because the readers are in the network thread 
and the writers are in the request
 // handler threads or the purgatory threads
 @volatile var requestDequeueTimeNanos = -1L
@@ -68,31 +60,17 @@ object RequestChannel extends Logging {
 @volatile var apiRemoteCompleteTimeNanos = -1L
 @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
-val header: RequestHeader = try {
-  RequestHeader.parse(buffer)
-} catch {
-  case ex: Throwable =>
-throw new InvalidRequestException(s"Error parsing request header. Our 
best guess of the apiKey is: ${buffer.getShort(0)}", ex)
-}
+val session = Session(context.principal, context.clientAddress)
+private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
-val bodyAndSize: RequestAndSize =
-  try {
-// For unsupported version of ApiVersionsRequest, create a dummy 
request to enable an error response to be returned later
-if (header.apiKey == ApiKeys.API_VERSIONS.id && 
!Protocol.apiVersionSupported(header.apiKey, header.apiVersion)) {
-  new RequestAndSize(new ApiVersionsRequest.Builder().build(), 0)
-}
-else
-  AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-  } catch {
-case ex: Throwable =>
-  throw new InvalidRequestException(s"Error getting request for 
apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
-  }
+def header: RequestHeader = context.header
+def sizeOfBodyInBytes: Int = bodyAndSize.size
 
 //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
 //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
 //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
-if 

[3/3] kafka git commit: MINOR: Consolidate broker request/response handling

2017-08-25 Thread jgus
MINOR: Consolidate broker request/response handling

This patch contains a few small improvements to make request/response handling 
more consistent. Primarily it consolidates request/response serialization logic 
so that `SaslServerAuthenticator` and `KafkaApis` follow the same path. It also 
reduces the amount of custom logic needed to handle unsupported versions of the 
ApiVersions requests.

Author: Jason Gustafson 

Reviewers: Ismael Juma 

Closes #3673 from hachikuji/consolidate-response-handling


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4d629a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4d629a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4d629a0

Branch: refs/heads/trunk
Commit: c4d629a0b3cbd11c174cb8b09a50bc8de77825e9
Parents: 05e3850
Author: Jason Gustafson 
Authored: Fri Aug 25 10:23:11 2017 -0700
Committer: Jason Gustafson 
Committed: Fri Aug 25 10:23:11 2017 -0700

--
 .../org/apache/kafka/clients/ClientRequest.java |   2 +-
 .../org/apache/kafka/clients/NetworkClient.java |  24 +-
 .../internals/ConsumerNetworkClient.java|   4 +-
 .../clients/producer/internals/Sender.java  |   5 +-
 .../kafka/common/network/ChannelBuilders.java   |   2 +-
 .../common/network/SaslChannelBuilder.java  |  17 +-
 .../org/apache/kafka/common/network/Send.java   |   4 +-
 .../kafka/common/requests/AbstractRequest.java  | 112 +++--
 .../kafka/common/requests/AbstractResponse.java |  19 +-
 .../common/requests/ApiVersionsRequest.java |  27 ++-
 .../common/requests/ApiVersionsResponse.java|  12 +-
 .../kafka/common/requests/FetchResponse.java|  21 +-
 .../kafka/common/requests/RequestContext.java   |  92 
 .../kafka/common/requests/RequestHeader.java|  48 ++--
 .../authenticator/SaslClientAuthenticator.java  |   8 +-
 .../authenticator/SaslServerAuthenticator.java  | 103 +
 .../common/network/SaslChannelBuilderTest.java  |   4 +-
 .../common/requests/RequestContextTest.java |  75 ++
 .../common/requests/RequestHeaderTest.java  |   6 +-
 .../common/requests/RequestResponseTest.java|  12 +-
 .../authenticator/SaslAuthenticatorTest.java|  14 +-
 .../SaslServerAuthenticatorTest.java|   8 +-
 .../controller/ControllerChannelManager.scala   |   2 +-
 ...nsactionMarkerRequestCompletionHandler.scala |   4 +-
 .../scala/kafka/network/RequestChannel.scala| 111 -
 .../main/scala/kafka/network/SocketServer.scala |  24 +-
 .../server/ClientRequestQuotaManager.scala  |  25 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 229 +--
 .../kafka/server/KafkaRequestHandler.scala  |  59 ++---
 core/src/main/scala/kafka/utils/Logging.scala   |   2 +
 .../scala/other/kafka/TestOffsetManager.scala   |   3 +-
 .../unit/kafka/admin/AdminRackAwareTest.scala   |   5 +-
 .../admin/ResetConsumerGroupOffsetTest.scala|   9 +-
 .../TransactionMarkerChannelManagerTest.scala   |  11 +-
 ...tionMarkerRequestCompletionHandlerTest.scala |  30 ++-
 .../integration/KafkaServerTestHarness.scala|   2 +-
 .../kafka/integration/PrimitiveApiTest.scala|   3 +-
 .../kafka/integration/TopicMetadataTest.scala   |   2 +-
 .../ZookeeperConsumerConnectorTest.scala|   1 -
 .../kafka/log/ProducerStateManagerTest.scala|   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |  64 +++---
 .../unit/kafka/server/BaseRequestTest.scala |   2 +-
 .../unit/kafka/server/FetchRequestTest.scala|   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  21 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/LogRecoveryTest.scala |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala|   2 +-
 .../unit/kafka/server/ReplicaFetchTest.scala|   3 -
 .../kafka/server/ReplicationQuotasTest.scala|   2 +-
 .../unit/kafka/server/RequestQuotaTest.scala|   4 +-
 .../unit/kafka/utils/IteratorTemplateTest.scala |   2 +-
 .../unit/kafka/utils/timer/TimerTest.scala  |   3 +-
 52 files changed, 685 insertions(+), 571 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 964..9b62946 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -79,7 +79,7 @@ public final class ClientRequest {
 }
 
 public RequestHeader makeHeader(short version) {
-return 

[1/3] kafka git commit: MINOR: Consolidate broker request/response handling

2017-08-25 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk 05e3850b2 -> c4d629a0b


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4b33dcb..aadb4d2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -298,7 +298,7 @@ class RequestQuotaTest extends BaseRequestTest {
   private def requestResponse(socket: Socket, clientId: String, correlationId: 
Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
 val apiKey = requestBuilder.apiKey
 val request = requestBuilder.build()
-val header = new RequestHeader(apiKey.id, request.version, clientId, 
correlationId)
+val header = new RequestHeader(apiKey, request.version, clientId, 
correlationId)
 val response = requestAndReceive(socket, request.serialize(header).array)
 val responseBuffer = skipResponseHeader(response)
 apiKey.parseResponse(request.version, responseBuffer)
@@ -326,7 +326,7 @@ class RequestQuotaTest extends BaseRequestTest {
 override def toString: String = {
   val requestTime = requestTimeMetricValue(clientId)
   val throttleTime = throttleTimeMetricValue(clientId)
-  s"Client $clientId apiKey ${apiKey} requests $correlationId requestTime 
$requestTime throttleTime $throttleTime"
+  s"Client $clientId apiKey $apiKey requests $correlationId requestTime 
$requestTime throttleTime $throttleTime"
 }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
index 6a40510..67e64d5 100644
--- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -18,7 +18,7 @@ package kafka.utils
 
 import org.junit.Assert._
 import org.scalatest.Assertions
-import org.junit.{Test, After, Before}
+import org.junit.Test
 
 class IteratorTemplateTest extends Assertions {
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
--
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala 
b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
index 54b73b8..e25a5cb 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
@@ -16,14 +16,13 @@
  */
 package kafka.utils.timer
 
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, 
TimeUnit}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import org.junit.Assert._
 import java.util.concurrent.atomic._
 import org.junit.{Test, After, Before}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
 
 class TimerTest {
 



kafka git commit: KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order

2017-08-25 Thread damianguy
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 6e5f8850d -> 142575891


KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method 
returns incorrect results when segments were added out of order

Suggested fix for the bug

Author: radzish 

Reviewers: Damian Guy 

Closes #3737 from radzish/KAFKA-5771

(cherry picked from commit 05e3850b2ea24f372ea1cc6ca7457e0ff99445f6)
Signed-off-by: Damian Guy 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14257589
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14257589
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14257589

Branch: refs/heads/0.11.0
Commit: 1425758915adbc858d5f5c5b3da9cabc10215aa6
Parents: 6e5f885
Author: radzish 
Authored: Fri Aug 25 13:59:10 2017 +0100
Committer: Damian Guy 
Committed: Fri Aug 25 13:59:25 2017 +0100

--
 .../kafka/streams/state/internals/Segments.java  |  4 +---
 .../kafka/streams/state/internals/SegmentsTest.java  | 15 +++
 2 files changed, 16 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/14257589/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 77b92a5..9c8653a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -85,9 +85,7 @@ class Segments {
 if (previousSegment == null) {
 newSegment.openDB(context);
 maxSegmentId = segmentId > maxSegmentId ? segmentId : 
maxSegmentId;
-if (minSegmentId == Long.MAX_VALUE) {
-minSegmentId = maxSegmentId;
-}
+minSegmentId = segmentId < minSegmentId ? segmentId : 
minSegmentId;
 }
 return previousSegment == null ? newSegment : previousSegment;
 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/14257589/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index aedf74b..9d367eb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -167,6 +167,21 @@ public class SegmentsTest {
 }
 
 @Test
+public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception {
+segments.getOrCreateSegment(4, context);
+segments.getOrCreateSegment(2, context);
+segments.getOrCreateSegment(0, context);
+segments.getOrCreateSegment(1, context);
+segments.getOrCreateSegment(3, context);
+
+final List segments = this.segments.segments(0, 2 * 60 * 
1000);
+assertEquals(3, segments.size());
+assertEquals(0, segments.get(0).id);
+assertEquals(1, segments.get(1).id);
+assertEquals(2, segments.get(2).id);
+}
+
+@Test
 public void shouldRollSegments() throws Exception {
 segments.getOrCreateSegment(0, context);
 verifyCorrectSegments(0, 1);



kafka git commit: KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method returns incorrect results when segments were added out of order

2017-08-25 Thread damianguy
Repository: kafka
Updated Branches:
  refs/heads/trunk 14f6ecd91 -> 05e3850b2


KAFKA-5771; org.apache.kafka.streams.state.internals.Segments#segments method 
returns incorrect results when segments were added out of order

Suggested fix for the bug

Author: radzish 

Reviewers: Damian Guy 

Closes #3737 from radzish/KAFKA-5771


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05e3850b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05e3850b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05e3850b

Branch: refs/heads/trunk
Commit: 05e3850b2ea24f372ea1cc6ca7457e0ff99445f6
Parents: 14f6ecd
Author: radzish 
Authored: Fri Aug 25 13:59:10 2017 +0100
Committer: Damian Guy 
Committed: Fri Aug 25 13:59:10 2017 +0100

--
 .../kafka/streams/state/internals/Segments.java  |  4 +---
 .../kafka/streams/state/internals/SegmentsTest.java  | 15 +++
 2 files changed, 16 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/05e3850b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 77b92a5..9c8653a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -85,9 +85,7 @@ class Segments {
 if (previousSegment == null) {
 newSegment.openDB(context);
 maxSegmentId = segmentId > maxSegmentId ? segmentId : 
maxSegmentId;
-if (minSegmentId == Long.MAX_VALUE) {
-minSegmentId = maxSegmentId;
-}
+minSegmentId = segmentId < minSegmentId ? segmentId : 
minSegmentId;
 }
 return previousSegment == null ? newSegment : previousSegment;
 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/05e3850b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
--
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index aedf74b..9d367eb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -167,6 +167,21 @@ public class SegmentsTest {
 }
 
 @Test
+public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception {
+segments.getOrCreateSegment(4, context);
+segments.getOrCreateSegment(2, context);
+segments.getOrCreateSegment(0, context);
+segments.getOrCreateSegment(1, context);
+segments.getOrCreateSegment(3, context);
+
+final List segments = this.segments.segments(0, 2 * 60 * 
1000);
+assertEquals(3, segments.size());
+assertEquals(0, segments.get(0).id);
+assertEquals(1, segments.get(1).id);
+assertEquals(2, segments.get(2).id);
+}
+
+@Test
 public void shouldRollSegments() throws Exception {
 segments.getOrCreateSegment(0, context);
 verifyCorrectSegments(0, 1);



kafka git commit: KAFKA-4869; Update 0.10.2.0 upgrade notes

2017-08-25 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 ff50cf9ea -> a00823805


KAFKA-4869; Update 0.10.2.0 upgrade notes

Author: Manikumar Reddy 

Reviewers: Ismael Juma 

Closes #3735 from omkreddy/cleanup10


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a0082380
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a0082380
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a0082380

Branch: refs/heads/0.10.2
Commit: a008238051b3dbb1a85c0c04b31486cd7fac
Parents: ff50cf9
Author: Manikumar Reddy 
Authored: Fri Aug 25 11:22:16 2017 +0100
Committer: Ismael Juma 
Committed: Fri Aug 25 11:22:16 2017 +0100

--
 docs/upgrade.html | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a0082380/docs/upgrade.html
--
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0221f63..d7581fa 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -87,7 +87,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 
brokers support 0.8.
 should not be set in the Streams app any more. If the Kafka cluster is 
secured, Streams apps must have the required security privileges to create new 
topics.
 Several new fields including "security.protocol", 
"connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and 
"request.timeout.ms" were added to
 StreamsConfig class. User should pay attention to the default values 
and set these if needed. For more details please refer to 3.5 Kafka Streams 
Configs.
-The offsets.topic.replication.factor broker config is now 
enforced upon auto topic creation. Internal auto topic creation will fail with 
a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this 
replication factor requirement.
 
 
 New 
Protocol Versions



kafka git commit: MINOR: KafkaService should print node hostname on failure

2017-08-25 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk e6164ec61 -> 14f6ecd91


MINOR: KafkaService should print node hostname on failure

Author: Colin P. Mccabe 

Reviewers: Apurva Mehta , Ismael Juma 

Closes #3715 from cmccabe/kafka_service_print_node_hostname_on_failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14f6ecd9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14f6ecd9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14f6ecd9

Branch: refs/heads/trunk
Commit: 14f6ecd9151fa985fad0a30347efab1397af2491
Parents: e6164ec
Author: Colin P. Mccabe 
Authored: Fri Aug 25 07:44:31 2017 +0100
Committer: Ismael Juma 
Committed: Fri Aug 25 07:44:42 2017 +0100

--
 tests/kafkatest/services/kafka/kafka.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/14f6ecd9/tests/kafkatest/services/kafka/kafka.py
--
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 743f63a..8c21d68 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -239,7 +239,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
 self.start_jmx_tool(self.idx(node), node)
 if len(self.pids(node)) == 0:
-raise Exception("No process ids recorded on node %s" % str(node))
+raise Exception("No process ids recorded on node %s" % 
node.account.hostname)
 
 def pids(self, node):
 """Return process ids associated with running processes on the given 
node."""



kafka-site git commit: KAFKA-4869: Update 0.10.2.0 upgrade notes

2017-08-25 Thread ijuma
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site a1b78e7a3 -> aad5bb53c


KAFKA-4869: Update 0.10.2.0 upgrade notes

Author: Manikumar Reddy 

Reviewers: Ismael Juma 

Closes #71 from omkreddy/cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/aad5bb53
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/aad5bb53
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/aad5bb53

Branch: refs/heads/asf-site
Commit: aad5bb53c92221ed2e6336ccb344a389d57f0208
Parents: a1b78e7
Author: Manikumar Reddy 
Authored: Thu Aug 24 17:16:31 2017 +0530
Committer: Ismael Juma 
Committed: Fri Aug 25 07:41:42 2017 +0100

--
 0102/upgrade.html | 1 -
 1 file changed, 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/aad5bb53/0102/upgrade.html
--
diff --git a/0102/upgrade.html b/0102/upgrade.html
index 0221f63..d7581fa 100644
--- a/0102/upgrade.html
+++ b/0102/upgrade.html
@@ -87,7 +87,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 
brokers support 0.8.
 should not be set in the Streams app any more. If the Kafka cluster is 
secured, Streams apps must have the required security privileges to create new 
topics.
 Several new fields including "security.protocol", 
"connections.max.idle.ms", "retry.backoff.ms", "reconnect.backoff.ms" and 
"request.timeout.ms" were added to
 StreamsConfig class. User should pay attention to the default values 
and set these if needed. For more details please refer to 3.5 Kafka Streams 
Configs.
-The offsets.topic.replication.factor broker config is now 
enforced upon auto topic creation. Internal auto topic creation will fail with 
a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this 
replication factor requirement.
 
 
 New 
Protocol Versions