Repository: kafka
Updated Branches:
  refs/heads/trunk d7bffebca -> ecc1fb10f


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java 
b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
index 0348258..e1bf2bb 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java
@@ -16,14 +16,20 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
-public class MockSerializer implements Serializer<byte[]> {
+public class MockSerializer implements ClusterResourceListener, 
Serializer<byte[]> {
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicReference<ClusterResource> CLUSTER_META = new 
AtomicReference<>();
+    public static final ClusterResource NO_CLUSTER_ID = new 
ClusterResource("no_cluster_id");
+    public static final AtomicReference<ClusterResource> 
CLUSTER_ID_BEFORE_SERIALIZE = new AtomicReference<>(NO_CLUSTER_ID);
 
     public MockSerializer() {
         INIT_COUNT.incrementAndGet();
@@ -35,6 +41,9 @@ public class MockSerializer implements Serializer<byte[]> {
 
     @Override
     public byte[] serialize(String topic, byte[] data) {
+        // This will ensure that we get the cluster metadata when serialize is 
called for the first time
+        // as subsequent compareAndSet operations will fail.
+        CLUSTER_ID_BEFORE_SERIALIZE.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
         return data;
     }
 
@@ -42,4 +51,9 @@ public class MockSerializer implements Serializer<byte[]> {
     public void close() {
         CLOSE_COUNT.incrementAndGet();
     }
+
+    @Override
+    public void onUpdate(ClusterResource clusterResource) {
+        CLUSTER_META.set(clusterResource);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 265661a..adc79ac 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -40,8 +40,16 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.DatatypeConverter;
 
 import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Helper functions for writing unit tests
@@ -81,7 +89,7 @@ public class TestUtils {
             for (int i = 0; i < partitions; i++)
                 parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, 
ns));
         }
-        return new Cluster(asList(ns), parts, Collections.<String>emptySet(), 
INTERNAL_TOPICS);
+        return new Cluster("kafka-cluster", asList(ns), parts, 
Collections.<String>emptySet(), INTERNAL_TOPICS);
     }
 
     public static Cluster clusterWith(final int nodes, final String topic, 
final int partitions) {
@@ -269,4 +277,33 @@ public class TestUtils {
         }
     }
 
-}
+    /**
+     * Checks if a cluster id is valid.
+     * @param clusterId
+     */
+    public static void isValidClusterId(String clusterId) {
+        assertNotNull(clusterId);
+
+        // Base 64 encoded value is 22 characters
+        assertEquals(clusterId.length(), 22);
+
+        Pattern clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+");
+        Matcher matcher = clusterIdPattern.matcher(clusterId);
+        assertTrue(matcher.matches());
+
+        // Convert into normal variant and add padding at the end.
+        String originalClusterId = String.format("%s==", 
clusterId.replace("_", "/").replace("-", "+"));
+        byte[] decodedUuid = 
DatatypeConverter.parseBase64Binary(originalClusterId);
+
+        // We expect 16 bytes, same as the input UUID.
+        assertEquals(decodedUuid.length, 16);
+
+        //Check if it can be converted back to a UUID.
+        try {
+            ByteBuffer uuidBuffer = ByteBuffer.wrap(decodedUuid);
+            new UUID(uuidBuffer.getLong(), uuidBuffer.getLong()).toString();
+        } catch (Exception e) {
+            fail(clusterId + " cannot be converted back to UUID.");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 999b2a4..6d35539 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -23,6 +23,8 @@ package kafka.metrics
 import kafka.utils.{CoreUtils, VerifiableProperties}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable.ArrayBuffer
+
 
 /**
  * Base trait for reporter MBeans. If a client wants to expose these JMX
@@ -42,22 +44,27 @@ trait KafkaMetricsReporterMBean {
   def getMBeanName: String
 }
 
-
+/**
+  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
+  */
 trait KafkaMetricsReporter {
   def init(props: VerifiableProperties)
 }
 
 object KafkaMetricsReporter {
   val ReporterStarted: AtomicBoolean = new AtomicBoolean(false)
+  private var reporters: ArrayBuffer[KafkaMetricsReporter] = null
 
-  def startReporters (verifiableProps: VerifiableProperties) {
+  def startReporters (verifiableProps: VerifiableProperties): 
Seq[KafkaMetricsReporter] = {
     ReporterStarted synchronized {
       if (!ReporterStarted.get()) {
+        reporters = ArrayBuffer[KafkaMetricsReporter]()
         val metricsConfig = new KafkaMetricsConfig(verifiableProps)
         if(metricsConfig.reporters.nonEmpty) {
           metricsConfig.reporters.foreach(reporterType => {
             val reporter = 
CoreUtils.createObject[KafkaMetricsReporter](reporterType)
             reporter.init(verifiableProps)
+            reporters += reporter
             reporter match {
               case bean: KafkaMetricsReporterMBean => 
CoreUtils.registerMBean(reporter, bean.getMBeanName)
               case _ =>
@@ -67,6 +74,7 @@ object KafkaMetricsReporter {
         }
       }
     }
+    reporters
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 677b5dd..3008426 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -63,7 +63,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val metadataCache: MetadataCache,
                 val metrics: Metrics,
                 val authorizer: Option[Authorizer],
-                val quotas: QuotaManagers) extends Logging {
+                val quotas: QuotaManagers,
+                val clusterId: String) extends Logging {
 
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
@@ -754,6 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseBody = new MetadataResponse(
       brokers.map(_.getNode(request.securityProtocol)).asJava,
+      clusterId,
       
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
       completeTopicMetadata.asJava,
       requestVersion

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 4509e37..db92cb8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import java.net.{SocketTimeoutException}
+import java.net.SocketTimeoutException
 import java.util
 
 import kafka.admin._
@@ -26,18 +26,21 @@ import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
 import java.util.concurrent._
-import atomic.{AtomicInteger, AtomicBoolean}
-import java.io.{IOException, File}
+import atomic.{AtomicBoolean, AtomicInteger}
+import java.io.{File, IOException}
+import java.nio.charset.StandardCharsets
+import java.util.UUID
+import javax.xml.bind.DatatypeConverter
 
 import kafka.security.auth.Authorizer
 import kafka.utils._
-import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, 
NetworkClient}
-import org.apache.kafka.common.Node
+import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, 
NetworkClient}
+import org.apache.kafka.common.{ClusterResource, Node}
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{LoginType, Selectable, 
ChannelBuilders, NetworkReceive, Selector, Mode}
-import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, 
NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
-import org.apache.kafka.common.requests.{ControlledShutdownResponse, 
ControlledShutdownRequest, RequestSend}
+import org.apache.kafka.common.requests.{ControlledShutdownRequest, 
ControlledShutdownResponse, RequestSend}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.AppInfoParser
 
@@ -45,12 +48,14 @@ import scala.collection.{Map, mutable}
 import scala.collection.JavaConverters._
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
-import kafka.cluster.{EndPoint, Broker}
-import kafka.common.{InconsistentBrokerIdException, GenerateBrokerIdException}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.network.{BlockingChannel, SocketServer}
-import kafka.metrics.KafkaMetricsGroup
+import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.GroupCoordinator
+import org.apache.kafka.common.internals.ClusterResourceListeners
+import collection.JavaConverters._
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -89,7 +94,7 @@ object KafkaServer {
  * Represents the lifecycle of a single Kafka broker. Handles all 
functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, 
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup 
{
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, 
threadNamePrefix: Option[String] = None, kafkaMetricsReporters: 
Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup {
   private val startupComplete = new AtomicBoolean(false)
   private val isShuttingDown = new AtomicBoolean(false)
   private val isStartingUp = new AtomicBoolean(false)
@@ -140,6 +145,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new 
BrokerMetadataCheckpoint(new File(logDir + File.separator 
+brokerMetaPropsFile)))).toMap
 
+  private var _clusterId: String = null
+
+  def clusterId: String = _clusterId
+
   newGauge(
     "BrokerState",
     new Gauge[Int] {
@@ -148,6 +157,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
   )
 
   newGauge(
+    "ClusterId",
+    new Gauge[String] {
+      def value = clusterId
+    }
+  )
+
+  newGauge(
     "yammer-metrics-count",
     new Gauge[Int] {
       def value = {
@@ -183,6 +199,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         /* setup zookeeper */
         zkUtils = initZk()
 
+        /* Get or create cluster_id */
+        _clusterId = getOrGenerateClusterId(zkUtils)
+        info(s"Cluster ID = $clusterId")
+
+        notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)
+
         /* start log manager */
         logManager = createLogManager(zkUtils.zkClient, brokerState)
         logManager.startup()
@@ -220,7 +242,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
adminManager, groupCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, 
metrics, authorizer, quotaManagers)
+          kafkaController, zkUtils, config.brokerId, config, metadataCache, 
metrics, authorizer, quotaManagers, clusterId)
+
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
 
         Mx4jLoader.maybeLoad()
@@ -274,6 +297,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
     }
   }
 
+  def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
+    val clusterResourceListeners = new ClusterResourceListeners
+    clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
+    clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
+  }
+
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
 
@@ -308,6 +337,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
     zkUtils
   }
 
+  def getOrGenerateClusterId(zkUtils: ZkUtils): String = {
+    
zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
+  }
 
   /**
    *  Forces some dynamic jmx beans to be registered on server startup.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala 
b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index fc98912..4dfbb52 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -24,13 +24,13 @@ import kafka.utils.{VerifiableProperties, Logging}
 
 object KafkaServerStartable {
   def fromProps(serverProps: Properties) = {
-    KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
-    new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
+    val reporters = KafkaMetricsReporter.startReporters(new 
VerifiableProperties(serverProps))
+    new KafkaServerStartable(KafkaConfig.fromProps(serverProps), reporters)
   }
 }
 
-class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
-  private val server = new KafkaServer(serverConfig)
+class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: 
Seq[KafkaMetricsReporter]) extends Logging {
+  private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = 
reporters)
 
   def startup() {
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index b4209e3..4edf5ed 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -20,9 +20,11 @@ package kafka.utils
 import java.io._
 import java.nio._
 import java.nio.channels._
-import java.util.concurrent.locks.{ReadWriteLock, Lock}
+import java.util.concurrent.locks.{Lock, ReadWriteLock}
 import java.lang.management._
+import java.util.UUID
 import javax.management._
+import javax.xml.bind.DatatypeConverter
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 
@@ -278,4 +280,25 @@ object CoreUtils extends Logging {
     val listenerList = parseCsvList(listeners)
     listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => 
ep.protocolType -> ep).toMap
   }
+
+  def generateUuidAsBase64(): String = {
+    val uuid = UUID.randomUUID()
+    urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid))
+  }
+
+  def getBytesFromUuid(uuid: UUID): Array[Byte] = {
+    // Extract bytes for uuid which is 128 bits (or 16 bytes) long.
+    val uuidBytes = ByteBuffer.wrap(new Array[Byte](16))
+    uuidBytes.putLong(uuid.getMostSignificantBits)
+    uuidBytes.putLong(uuid.getLeastSignificantBits)
+    uuidBytes.array
+  }
+
+  def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = {
+    val base64EncodedUUID = DatatypeConverter.printBase64Binary(data)
+    //Convert to URL safe variant by replacing + and / with - and _ 
respectively.
+    val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", 
"-").replace("/", "_")
+    // Remove the "==" padding at the end.
+    urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a137da8..503ed54 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -41,6 +41,7 @@ import scala.collection._
 
 object ZkUtils {
   val ConsumersPath = "/consumers"
+  val ClusterIdPath = "/cluster/id"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
@@ -209,6 +210,35 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  /* Represents a cluster identifier. Stored in Zookeeper in JSON format: 
{"version" -> "1", "id" -> id } */
+  object ClusterId {
+
+    def toJson(id: String) = {
+      val jsonMap = Map("version" -> "1", "id" -> id)
+      Json.encode(jsonMap)
+    }
+
+    def fromJson(clusterIdJson: String): String = {
+      Json.parseFull(clusterIdJson).map { m =>
+        val clusterIdMap = m.asInstanceOf[Map[String, Any]]
+        clusterIdMap.get("id").get.asInstanceOf[String]
+      }.getOrElse(throw new KafkaException(s"Failed to parse the cluster id 
json $clusterIdJson"))
+    }
+  }
+
+  def getClusterId: Option[String] =
+    readDataMaybeNull(ClusterIdPath)._1.map(ClusterId.fromJson)
+
+  def createOrGetClusterId(proposedClusterId: String): String = {
+    try {
+      createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId))
+      proposedClusterId
+    } catch {
+      case e: ZkNodeExistsException =>
+        getClusterId.getOrElse(throw new KafkaException("Failed to get cluster 
id from Zookeeper. This can only happen if /cluster/id is deleted from 
Zookeeper."))
+    }
+  }
+
   def getSortedBrokerList(): Seq[Int] =
     getChildren(BrokerIdsPath).map(_.toInt).sorted
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
new file mode 100644
index 0000000..9e03e27
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -0,0 +1,225 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.atomic.AtomicReference
+import java.util.{Properties}
+
+import kafka.common.TopicAndPartition
+import kafka.integration.KafkaServerTestHarness
+import kafka.server._
+import kafka.utils._
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, 
TopicPartition}
+import org.apache.kafka.test.{TestUtils => _, _}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+import org.apache.kafka.test.TestUtils.isValidClusterId
+
+import scala.collection.mutable.ArrayBuffer
+
+/** The test cases here verify the following conditions.
+  * 1. The ProducerInterceptor receives the cluster id after the onSend() 
method is called and before onAcknowledgement() method is called.
+  * 2. The Serializer receives the cluster id before the serialize() method is 
called.
+  * 3. The producer MetricReporter receives the cluster id after send() method 
is called on KafkaProducer.
+  * 4. The ConsumerInterceptor receives the cluster id before the onConsume() 
method.
+  * 5. The Deserializer receives the cluster id before the deserialize() 
method is called.
+  * 6. The consumer MetricReporter receives the cluster id after poll() is 
called on KafkaConsumer.
+  * 7. The broker MetricReporter receives the cluster id after the broker 
startup is over.
+  * 8. The broker KafkaMetricReporter receives the cluster id after the broker 
startup is over.
+  * 9. All the components receive the same cluster id.
+  */
+
+object EndToEndClusterIdTest {
+
+  object MockConsumerMetricsReporter {
+    val CLUSTER_META = new AtomicReference[ClusterResource]
+  }
+
+  class MockConsumerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
+
+    override def onUpdate(clusterMetadata: ClusterResource) {
+      MockConsumerMetricsReporter.CLUSTER_META.set(clusterMetadata)
+    }
+  }
+
+  object MockProducerMetricsReporter {
+    val CLUSTER_META = new AtomicReference[ClusterResource]
+  }
+
+  class MockProducerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
+
+    override def onUpdate(clusterMetadata: ClusterResource) {
+      MockProducerMetricsReporter.CLUSTER_META.set(clusterMetadata)
+    }
+  }
+
+  object MockBrokerMetricsReporter {
+    val CLUSTER_META = new AtomicReference[ClusterResource]
+  }
+
+  class MockBrokerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
+
+    override def onUpdate(clusterMetadata: ClusterResource) {
+      MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
+    }
+  }
+}
+
+class EndToEndClusterIdTest extends KafkaServerTestHarness {
+
+  import EndToEndClusterIdTest._
+
+  val producerCount = 1
+  val consumerCount = 1
+  val serverCount = 1
+  lazy val producerConfig = new Properties
+  lazy val consumerConfig = new Properties
+  lazy val serverConfig = new Properties
+  val numRecords = 1
+  val topic = "e2etopic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+  val topicAndPartition = new TopicAndPartition(topic, part)
+  this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, 
"kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter")
+
+  override def generateConfigs() = {
+    val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, 
interBrokerSecurityProtocol = Some(securityProtocol),
+      trustStoreFile = trustStoreFile, saslProperties = saslProperties)
+    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.map(KafkaConfig.fromProps)
+  }
+
+  @Before
+  override def setUp() {
+    super.setUp
+    // create the consumer offset topic
+    TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+  }
+
+  @Test
+  def testEndToEnd() {
+    val appendStr = "mock"
+    MockConsumerInterceptor.resetCounters()
+    MockProducerInterceptor.resetCounters()
+
+    assertNotNull(MockBrokerMetricsReporter.CLUSTER_META)
+    isValidClusterId(MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put("mock.interceptor.append", appendStr)
+    producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.api.EndToEndClusterIdTest$MockProducerMetricsReporter")
+    val testProducer = new KafkaProducer(producerProps, new MockSerializer, 
new MockSerializer)
+
+    // Send one record and make sure clusterId is set after send and before 
onAcknowledgement
+    sendRecords(testProducer, 1, tp)
+    
assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT, 
MockProducerInterceptor.NO_CLUSTER_ID)
+    assertNotNull(MockProducerInterceptor.CLUSTER_META)
+    
assertEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get.clusterId,
 MockProducerInterceptor.CLUSTER_META.get.clusterId)
+    isValidClusterId(MockProducerInterceptor.CLUSTER_META.get.clusterId)
+
+    // Make sure that serializer gets the cluster id before serialize method.
+    assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE, 
MockSerializer.NO_CLUSTER_ID)
+    assertNotNull(MockSerializer.CLUSTER_META)
+    isValidClusterId(MockSerializer.CLUSTER_META.get.clusterId)
+
+    assertNotNull(MockProducerMetricsReporter.CLUSTER_META)
+    isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
+
+    this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList)
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
+    this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.api.EndToEndClusterIdTest$MockConsumerMetricsReporter")
+    val testConsumer = new KafkaConsumer(this.consumerConfig, new 
MockDeserializer, new MockDeserializer)
+    testConsumer.assign(List(tp).asJava)
+    testConsumer.seek(tp, 0)
+
+    // consume and verify that values are modified by interceptors
+    consumeRecords(testConsumer, numRecords)
+
+    // Check that cluster id is present after the first poll call.
+    assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME, 
MockConsumerInterceptor.NO_CLUSTER_ID)
+    assertNotNull(MockConsumerInterceptor.CLUSTER_META)
+    isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId)
+    
assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId,
 MockConsumerInterceptor.CLUSTER_META.get.clusterId)
+
+    assertNotEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE, 
MockDeserializer.NO_CLUSTER_ID)
+    assertNotNull(MockDeserializer.CLUSTER_META)
+    isValidClusterId(MockDeserializer.CLUSTER_META.get.clusterId)
+    assertEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE.get.clusterId, 
MockDeserializer.CLUSTER_META.get.clusterId)
+
+    assertNotNull(MockConsumerMetricsReporter.CLUSTER_META)
+    isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
+
+    // Make sure everyone receives the same cluster id.
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockSerializer.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockConsumerInterceptor.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockDeserializer.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockConsumerMetricsReporter.CLUSTER_META.get.clusterId)
+    assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, 
MockBrokerMetricsReporter.CLUSTER_META.get.clusterId)
+
+    testConsumer.close()
+    testProducer.close()
+    MockConsumerInterceptor.resetCounters()
+    MockProducerInterceptor.resetCounters()
+  }
+
+  private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], 
numRecords: Int, tp: TopicPartition) {
+    val futures = (0 until numRecords).map { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), 
s"$i".getBytes, s"$i".getBytes)
+      debug(s"Sending this record: $record")
+      producer.send(record)
+    }
+    try {
+      futures.foreach(_.get)
+    } catch {
+      case e: ExecutionException => throw e.getCause
+    }
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
+                             numRecords: Int = 1,
+                             startingOffset: Int = 0,
+                             topic: String = topic,
+                             part: Int = part) {
+    val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 50
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala) {
+        records += record
+      }
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic)
+      assertEquals(part, record.partition)
+      assertEquals(offset.toLong, record.offset)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 243f913..6c10632 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -675,6 +675,9 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   @Test
   def testInterceptors() {
     val appendStr = "mock"
+    MockConsumerInterceptor.resetCounters()
+    MockProducerInterceptor.resetCounters()
+
     // create producer with interceptor
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 1980e8a..d82ec58 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -18,8 +18,9 @@
 package kafka.metrics
 
 import java.util.Properties
+
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.MetricPredicate
+import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate}
 import org.junit.{After, Test}
 import org.junit.Assert._
 import kafka.integration.KafkaServerTestHarness
@@ -28,6 +29,7 @@ import kafka.serializer._
 import kafka.utils._
 import kafka.admin.AdminUtils
 import kafka.utils.TestUtils._
+
 import scala.collection._
 import scala.collection.JavaConversions._
 import scala.util.matching.Regex
@@ -90,6 +92,13 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     assertFalse("Topic metrics exists after deleteTopic", 
checkTopicMetricsExists(topic))
   }
 
+  @Test
+  def testClusterIdMetric(): Unit ={
+    // Check if clusterId metric exists.
+    val metrics = Metrics.defaultRegistry().allMetrics
+    
assertEquals(metrics.keySet.filter(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")).size,
 1)
+  }
+
   @deprecated("This test has been deprecated and it will be removed in a 
future release", "0.10.0.0")
   def createAndShutdownStep(group: String, consumerId: String, producerId: 
String): Unit = {
     sendMessages(servers, topic, nMessages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
new file mode 100755
index 0000000..d235d02
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
@@ -0,0 +1,93 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicReference
+
+import kafka.metrics.KafkaMetricsReporter
+import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.{ClusterResource, ClusterResourceListener}
+import org.apache.kafka.test.MockMetricsReporter
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.apache.kafka.test.TestUtils.isValidClusterId
+
+object KafkaMetricReporterClusterIdTest {
+
+  class MockKafkaMetricsReporter extends KafkaMetricsReporter with 
ClusterResourceListener {
+
+    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
+      MockKafkaMetricsReporter.CLUSTER_META.set(clusterMetadata)
+    }
+
+    override def init(props: VerifiableProperties): Unit = {
+    }
+  }
+
+  object MockKafkaMetricsReporter {
+    val CLUSTER_META = new AtomicReference[ClusterResource]
+  }
+
+  object MockBrokerMetricsReporter {
+    val CLUSTER_META: AtomicReference[ClusterResource] = new 
AtomicReference[ClusterResource]
+  }
+
+  class MockBrokerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
+
+    override def onUpdate(clusterMetadata: ClusterResource) {
+      MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
+    }
+  }
+
+}
+
+class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness {
+  var server: KafkaServerStartable = null
+  var config: KafkaConfig = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.setProperty("kafka.metrics.reporters", 
"kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
+    props.setProperty(KafkaConfig.MetricReporterClassesProp, 
"kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
+    config = KafkaConfig.fromProps(props)
+    server = KafkaServerStartable.fromProps(props)
+    server.startup()
+  }
+
+  @Test
+  def testClusterIdPresent() {
+    
assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META)
+    
isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())
+
+    
assertNotNull(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META)
+    
isValidClusterId(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
+
+    
assertEquals(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId(),
+      
KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
+  }
+
+  @After
+  override def tearDown() {
+    server.shutdown()
+    CoreUtils.delete(config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+    super.tearDown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 46a79de..11dd6fe 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import java.util.Properties
+import java.util.{Properties}
 
 import kafka.common.Topic
 import kafka.utils.TestUtils
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.Assert._
 import org.junit.Test
-
+import org.apache.kafka.test.TestUtils.isValidClusterId
 import scala.collection.JavaConverters._
 
 class MetadataRequestTest extends BaseRequestTest {
@@ -35,6 +35,19 @@ class MetadataRequestTest extends BaseRequestTest {
   }
 
   @Test
+  def testClusterIdWithRequestVersion1() {
+    val v1MetadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 1)
+    val v1ClusterId = v1MetadataResponse.clusterId
+    assertNull(s"v1 clusterId should be null", v1ClusterId)
+  }
+
+  @Test
+  def testClusterIdIsValid() {
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 2)
+    isValidClusterId(metadataResponse.clusterId)
+  }
+
+  @Test
   def testControllerId() {
     val controllerServer = servers.find(_.kafkaController.isActive()).get
     val controllerId = controllerServer.config.brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
new file mode 100755
index 0000000..325889f
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala
@@ -0,0 +1,140 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import scala.concurrent._
+import ExecutionContext.Implicits._
+import scala.concurrent.duration._
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.apache.kafka.test.TestUtils.isValidClusterId
+
+class ServerGenerateClusterIdTest extends ZooKeeperTestHarness {
+  var config1: KafkaConfig = null
+  var config2: KafkaConfig = null
+  var config3: KafkaConfig = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    config1 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect))
+    config2 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(2, zkConnect))
+    config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect))
+  }
+
+  @Test
+  def testAutoGenerateClusterId() {
+    // Make sure that the cluster id doesn't exist yet.
+    assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+
+    var server1 = TestUtils.createServer(config1)
+
+    // Validate the cluster id
+    val clusterIdOnFirstBoot = server1.clusterId
+    isValidClusterId(clusterIdOnFirstBoot)
+
+    server1.shutdown()
+
+    // Make sure that the cluster id is persistent.
+    assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+    assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
+
+    // Restart the server check to confirm that it uses the clusterId 
generated previously
+    server1 = new KafkaServer(config1)
+    server1.startup()
+
+    val clusterIdOnSecondBoot = server1.clusterId
+    assertEquals(clusterIdOnFirstBoot, clusterIdOnSecondBoot)
+
+    server1.shutdown()
+
+    // Make sure that the cluster id is persistent after multiple reboots.
+    assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath))
+    assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot))
+
+    CoreUtils.delete(server1.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
+  def testAutoGenerateClusterIdForKafkaClusterSequential() {
+    val server1 = TestUtils.createServer(config1)
+    val clusterIdFromServer1 = server1.clusterId
+
+    val server2 = TestUtils.createServer(config2)
+    val clusterIdFromServer2 = server2.clusterId
+
+    val server3 = TestUtils.createServer(config3)
+    val clusterIdFromServer3 = server3.clusterId
+
+    server1.shutdown()
+    server2.shutdown()
+    server3.shutdown()
+
+    isValidClusterId(clusterIdFromServer1)
+    assertEquals(clusterIdFromServer1, clusterIdFromServer2, 
clusterIdFromServer3)
+
+    // Check again after reboot
+    server1.startup()
+    assertEquals(clusterIdFromServer1, server1.clusterId)
+    server2.startup()
+    assertEquals(clusterIdFromServer2, server2.clusterId)
+    server3.startup()
+    assertEquals(clusterIdFromServer3, server3.clusterId)
+    server1.shutdown()
+    server2.shutdown()
+    server3.shutdown()
+
+    CoreUtils.delete(server1.config.logDirs)
+    CoreUtils.delete(server2.config.logDirs)
+    CoreUtils.delete(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
+  def testAutoGenerateClusterIdForKafkaClusterParallel() {
+    val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => 
Future(TestUtils.createServer(config)))
+    val Seq(server1, server2, server3) = Await.result(firstBoot, 100 second)
+
+    val clusterIdFromServer1 = server1.clusterId
+    val clusterIdFromServer2 = server2.clusterId
+    val clusterIdFromServer3 = server3.clusterId
+
+    server1.shutdown()
+    server2.shutdown()
+    server3.shutdown()
+    isValidClusterId(clusterIdFromServer1)
+    assertEquals(clusterIdFromServer1, clusterIdFromServer2, 
clusterIdFromServer3)
+
+    // Check again after reboot
+    val secondBoot = Future.traverse(Seq(server1, server2, server3))(server => 
Future {
+      server.startup()
+      server
+    })
+    val servers = Await.result(secondBoot, 100 second)
+    servers.foreach(server => assertEquals(clusterIdFromServer1, 
server.clusterId))
+
+    servers.foreach(_.shutdown())
+    CoreUtils.delete(server1.config.logDirs)
+    CoreUtils.delete(server2.config.logDirs)
+    CoreUtils.delete(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index f42f0ff..22a0a16 100755
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -17,9 +17,11 @@
 
 package kafka.utils
 
-import java.util.Arrays
+import java.util.{Arrays, UUID}
 import java.util.concurrent.locks.ReentrantLock
 import java.nio.ByteBuffer
+import java.util.regex.Pattern
+
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
@@ -31,6 +33,7 @@ import org.apache.kafka.common.utils.Utils
 class UtilsTest extends JUnitSuite {
 
   private val logger = Logger.getLogger(classOf[UtilsTest])
+  val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+")
 
   @Test
   def testSwallow() {
@@ -139,7 +142,6 @@ class UtilsTest extends JUnitSuite {
     }
   }
 
-
   @Test
   def testInLock() {
     val lock = new ReentrantLock()
@@ -151,4 +153,26 @@ class UtilsTest extends JUnitSuite {
     assertFalse("Should be unlocked", lock.isLocked)
   }
 
+  @Test
+  def testUrlSafeBase64EncodeUUID() {
+
+    // Test a UUID that has no + or / characters in base64 encoding 
[a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
+    val clusterId1 = 
CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
+    assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
+    assertEquals(clusterId1.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId1).matches())
+
+    // Test a UUID that has + or / characters in base64 encoding 
[d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
+    val clusterId2 = 
CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("d418ec02-277e-4853-81e6-afe30259daec")))
+    assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
+    assertEquals(clusterId2.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId2).matches())
+  }
+
+  @Test
+  def testGenerateUuidAsBase64() {
+    val clusterId = CoreUtils.generateUuidAsBase64()
+    assertEquals(clusterId.length, 22)
+    assertTrue(clusterIdPattern.matcher(clusterId).matches())
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
index 2d81ed9..918c4b5 100755
--- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala
@@ -52,4 +52,10 @@ class ZkUtilsTest extends ZooKeeperTestHarness {
     val (optionalData, _) = zkUtils.readDataMaybeNull(path)
     assertTrue("Node should still be there", optionalData.isDefined)
   }
+
+  @Test
+  def testClusterIdentifierJsonParsing() {
+    val clusterId = "test"
+    
assertEquals(zkUtils.ClusterId.fromJson(zkUtils.ClusterId.toJson(clusterId)), 
clusterId)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index ba6289c..cd45aee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -49,8 +49,8 @@ public class WindowedStreamPartitionerTest {
             new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new 
Node[0])
     );
 
-    private Cluster cluster = new 
Cluster(Collections.singletonList(Node.noNode()), infos, 
Collections.<String>emptySet(),
-        Collections.<String>emptySet());
+    private Cluster cluster = new Cluster("cluster", 
Collections.singletonList(Node.noNode()), infos,
+            Collections.<String>emptySet(), Collections.<String>emptySet());
 
     @Test
     public void testCopartitioning() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
index 9683da9..e36bde4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java
@@ -44,7 +44,7 @@ public class DefaultPartitionGrouperTest {
             new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0])
     );
 
-    private Cluster metadata = new 
Cluster(Collections.singletonList(Node.noNode()), infos, 
Collections.<String>emptySet(),
+    private Cluster metadata = new Cluster("cluster", 
Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(),
         Collections.<String>emptySet());
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 8d5a549..922ddb3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -47,7 +47,7 @@ public class RecordCollectorTest {
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0])
     );
 
-    private Cluster cluster = new 
Cluster(Collections.singletonList(Node.noNode()), infos,
+    private Cluster cluster = new Cluster("cluster", 
Collections.singletonList(Node.noNode()), infos,
             Collections.<String>emptySet(), Collections.<String>emptySet());
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e5ae7d8..4f4d2eb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -82,7 +82,7 @@ public class StreamPartitionAssignorTest {
             new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new 
Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), 
infos, Collections.<String>emptySet(),
+    private Cluster metadata = new Cluster("cluster", 
Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
         Collections.<String>emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index ccbf8d6..c7e9daa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -81,7 +81,7 @@ public class StreamThreadTest {
             new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new 
Node[0])
     );
 
-    private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), 
infos, Collections.<String>emptySet(),
+    private Cluster metadata = new Cluster("cluster", 
Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(),
             Collections.<String>emptySet());
 
     private final PartitionAssignor.Subscription subscription =

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/tests/kafkatest/tests/core/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/upgrade_test.py 
b/tests/kafkatest/tests/core/upgrade_test.py
index 16a518d..9c83991 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -15,6 +15,8 @@
 
 from ducktape.mark import parametrize
 
+import json
+
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.kafka import config_property
@@ -22,7 +24,7 @@ from kafkatest.services.verifiable_producer import 
VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
+from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, 
LATEST_0_10_0, TRUNK, KafkaVersion
 
 
 class TestUpgrade(ProduceConsumeValidateTest):
@@ -60,6 +62,7 @@ class TestUpgrade(ProduceConsumeValidateTest):
             self.kafka.start_node(node)
 
 
+    @parametrize(from_kafka_version=str(LATEST_0_10_0), 
to_message_format_version=None, compression_types=["none"])
     @parametrize(from_kafka_version=str(LATEST_0_9), 
to_message_format_version=None, compression_types=["none"])
     @parametrize(from_kafka_version=str(LATEST_0_9), 
to_message_format_version=None, compression_types=["none"], new_consumer=True, 
security_protocol="SASL_SSL")
     @parametrize(from_kafka_version=str(LATEST_0_9), 
to_message_format_version=None, compression_types=["snappy"], new_consumer=True)
@@ -73,9 +76,9 @@ class TestUpgrade(ProduceConsumeValidateTest):
     @parametrize(from_kafka_version=str(LATEST_0_8_2), 
to_message_format_version=None, compression_types=["snappy"])
     def test_upgrade(self, from_kafka_version, to_message_format_version, 
compression_types,
                      new_consumer=False, security_protocol="PLAINTEXT"):
-        """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10
+        """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to 
the current version
 
-        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X 
or 0.9
+        from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 
0.9.0.x or 0.10.0.x
 
         If to_message_format_version is None, it means that we will upgrade to 
default (latest)
         message format version. It is possible to upgrade to 0.10 brokers but 
still use message
@@ -105,6 +108,8 @@ class TestUpgrade(ProduceConsumeValidateTest):
                                            compression_types=compression_types,
                                            
version=KafkaVersion(from_kafka_version))
 
+        assert self.zk.query("/cluster/id") is None
+
         # TODO - reduce the timeout
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
                                         self.topic, consumer_timeout_ms=30000, 
new_consumer=new_consumer,
@@ -112,3 +117,13 @@ class TestUpgrade(ProduceConsumeValidateTest):
 
         self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_upgrade(from_kafka_version,
                                                                                
         to_message_format_version))
+
+        cluster_id_json = self.zk.query("/cluster/id")
+        assert cluster_id_json is not None
+        try:
+            cluster_id = json.loads(cluster_id_json)
+        except :
+            self.logger.debug("Data in /cluster/id znode could not be parsed. 
Data = %s" % cluster_id_json)
+
+        self.logger.debug("Cluster id [%s]", cluster_id)
+        assert len(cluster_id["id"]) == 22

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 6b378e7..239a9f4 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -77,4 +77,8 @@ LATEST_0_9 = V_0_9_0_1
 
 # 0.10.0.X versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
-LATEST_0_10 = V_0_10_0_0
+V_0_10_0_1 = KafkaVersion("0.10.0.1")
+# Adding 0.10.0 as the next version will be 0.10.1.x
+LATEST_0_10_0 = V_0_10_0_1
+
+LATEST_0_10 = LATEST_0_10_0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index ebe54a8..3697765 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -65,6 +65,9 @@ get_kafka 0.8.2.2
 chmod a+rw /opt/kafka-0.8.2.2
 get_kafka 0.9.0.1
 chmod a+rw /opt/kafka-0.9.0.1
+get_kafka 0.10.0.1
+chmod a+rw /opt/kafka-0.10.0.1
+
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On 
local
 # VMs, we can just create it if it doesn't exist and use it like we'd use

Reply via email to