Repository: kafka Updated Branches: refs/heads/trunk d7bffebca -> ecc1fb10f
http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index 0348258..e1bf2bb 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -16,14 +16,20 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; -public class MockSerializer implements Serializer<byte[]> { +public class MockSerializer implements ClusterResourceListener, Serializer<byte[]> { public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); + public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>(); + public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id"); + public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_SERIALIZE = new AtomicReference<>(NO_CLUSTER_ID); public MockSerializer() { INIT_COUNT.incrementAndGet(); @@ -35,6 +41,9 @@ public class MockSerializer implements Serializer<byte[]> { @Override public byte[] serialize(String topic, byte[] data) { + // This will ensure that we get the cluster metadata when serialize is called for the first time + // as subsequent compareAndSet operations will fail. + CLUSTER_ID_BEFORE_SERIALIZE.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get()); return data; } @@ -42,4 +51,9 @@ public class MockSerializer implements Serializer<byte[]> { public void close() { CLOSE_COUNT.incrementAndGet(); } + + @Override + public void onUpdate(ClusterResource clusterResource) { + CLUSTER_META.set(clusterResource); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 265661a..adc79ac 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -40,8 +40,16 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.xml.bind.DatatypeConverter; import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Helper functions for writing unit tests @@ -81,7 +89,7 @@ public class TestUtils { for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); } - return new Cluster(asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS); + return new Cluster("kafka-cluster", asList(ns), parts, Collections.<String>emptySet(), INTERNAL_TOPICS); } public static Cluster clusterWith(final int nodes, final String topic, final int partitions) { @@ -269,4 +277,33 @@ public class TestUtils { } } -} + /** + * Checks if a cluster id is valid. + * @param clusterId + */ + public static void isValidClusterId(String clusterId) { + assertNotNull(clusterId); + + // Base 64 encoded value is 22 characters + assertEquals(clusterId.length(), 22); + + Pattern clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+"); + Matcher matcher = clusterIdPattern.matcher(clusterId); + assertTrue(matcher.matches()); + + // Convert into normal variant and add padding at the end. + String originalClusterId = String.format("%s==", clusterId.replace("_", "/").replace("-", "+")); + byte[] decodedUuid = DatatypeConverter.parseBase64Binary(originalClusterId); + + // We expect 16 bytes, same as the input UUID. + assertEquals(decodedUuid.length, 16); + + //Check if it can be converted back to a UUID. + try { + ByteBuffer uuidBuffer = ByteBuffer.wrap(decodedUuid); + new UUID(uuidBuffer.getLong(), uuidBuffer.getLong()).toString(); + } catch (Exception e) { + fail(clusterId + " cannot be converted back to UUID."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index 999b2a4..6d35539 100755 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -23,6 +23,8 @@ package kafka.metrics import kafka.utils.{CoreUtils, VerifiableProperties} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ArrayBuffer + /** * Base trait for reporter MBeans. If a client wants to expose these JMX @@ -42,22 +44,27 @@ trait KafkaMetricsReporterMBean { def getMBeanName: String } - +/** + * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. + */ trait KafkaMetricsReporter { def init(props: VerifiableProperties) } object KafkaMetricsReporter { val ReporterStarted: AtomicBoolean = new AtomicBoolean(false) + private var reporters: ArrayBuffer[KafkaMetricsReporter] = null - def startReporters (verifiableProps: VerifiableProperties) { + def startReporters (verifiableProps: VerifiableProperties): Seq[KafkaMetricsReporter] = { ReporterStarted synchronized { if (!ReporterStarted.get()) { + reporters = ArrayBuffer[KafkaMetricsReporter]() val metricsConfig = new KafkaMetricsConfig(verifiableProps) if(metricsConfig.reporters.nonEmpty) { metricsConfig.reporters.foreach(reporterType => { val reporter = CoreUtils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) + reporters += reporter reporter match { case bean: KafkaMetricsReporterMBean => CoreUtils.registerMBean(reporter, bean.getMBeanName) case _ => @@ -67,6 +74,7 @@ object KafkaMetricsReporter { } } } + reporters } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 677b5dd..3008426 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -63,7 +63,8 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataCache: MetadataCache, val metrics: Metrics, val authorizer: Option[Authorizer], - val quotas: QuotaManagers) extends Logging { + val quotas: QuotaManagers, + val clusterId: String) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) @@ -754,6 +755,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new MetadataResponse( brokers.map(_.getNode(request.securityProtocol)).asJava, + clusterId, metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), completeTopicMetadata.asJava, requestVersion http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4509e37..db92cb8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,7 +17,7 @@ package kafka.server -import java.net.{SocketTimeoutException} +import java.net.SocketTimeoutException import java.util import kafka.admin._ @@ -26,18 +26,21 @@ import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager import java.util.concurrent._ -import atomic.{AtomicInteger, AtomicBoolean} -import java.io.{IOException, File} +import atomic.{AtomicBoolean, AtomicInteger} +import java.io.{File, IOException} +import java.nio.charset.StandardCharsets +import java.util.UUID +import javax.xml.bind.DatatypeConverter import kafka.security.auth.Authorizer import kafka.utils._ -import org.apache.kafka.clients.{ManualMetadataUpdater, ClientRequest, NetworkClient} -import org.apache.kafka.common.Node +import org.apache.kafka.clients.{ClientRequest, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.common.{ClusterResource, Node} import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} -import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol} +import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} -import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend} +import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse, RequestSend} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.AppInfoParser @@ -45,12 +48,14 @@ import scala.collection.{Map, mutable} import scala.collection.JavaConverters._ import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} -import kafka.cluster.{EndPoint, Broker} -import kafka.common.{InconsistentBrokerIdException, GenerateBrokerIdException} +import kafka.cluster.{Broker, EndPoint} +import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} import kafka.network.{BlockingChannel, SocketServer} -import kafka.metrics.KafkaMetricsGroup +import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter} import com.yammer.metrics.core.Gauge import kafka.coordinator.GroupCoordinator +import org.apache.kafka.common.internals.ClusterResourceListeners +import collection.JavaConverters._ object KafkaServer { // Copy the subset of properties that are relevant to Logs @@ -89,7 +94,7 @@ object KafkaServer { * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ -class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup { private val startupComplete = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false) private val isStartingUp = new AtomicBoolean(false) @@ -140,6 +145,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap + private var _clusterId: String = null + + def clusterId: String = _clusterId + newGauge( "BrokerState", new Gauge[Int] { @@ -148,6 +157,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr ) newGauge( + "ClusterId", + new Gauge[String] { + def value = clusterId + } + ) + + newGauge( "yammer-metrics-count", new Gauge[Int] { def value = { @@ -183,6 +199,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* setup zookeeper */ zkUtils = initZk() + /* Get or create cluster_id */ + _clusterId = getOrGenerateClusterId(zkUtils) + info(s"Cluster ID = $clusterId") + + notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala) + /* start log manager */ logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup() @@ -220,7 +242,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, - kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers) + kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, clusterId) + requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad() @@ -274,6 +297,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr } } + def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = { + val clusterResourceListeners = new ClusterResourceListeners + clusterResourceListeners.maybeAddAll(clusterListeners.asJava) + clusterResourceListeners.onUpdate(new ClusterResource(clusterId)) + } + private def initZk(): ZkUtils = { info(s"Connecting to zookeeper on ${config.zkConnect}") @@ -308,6 +337,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr zkUtils } + def getOrGenerateClusterId(zkUtils: ZkUtils): String = { + zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64)) + } /** * Forces some dynamic jmx beans to be registered on server startup. http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/server/KafkaServerStartable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index fc98912..4dfbb52 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -24,13 +24,13 @@ import kafka.utils.{VerifiableProperties, Logging} object KafkaServerStartable { def fromProps(serverProps: Properties) = { - KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) - new KafkaServerStartable(KafkaConfig.fromProps(serverProps)) + val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps)) + new KafkaServerStartable(KafkaConfig.fromProps(serverProps), reporters) } } -class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { - private val server = new KafkaServer(serverConfig) +class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging { + private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters) def startup() { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index b4209e3..4edf5ed 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -20,9 +20,11 @@ package kafka.utils import java.io._ import java.nio._ import java.nio.channels._ -import java.util.concurrent.locks.{ReadWriteLock, Lock} +import java.util.concurrent.locks.{Lock, ReadWriteLock} import java.lang.management._ +import java.util.UUID import javax.management._ +import javax.xml.bind.DatatypeConverter import org.apache.kafka.common.protocol.SecurityProtocol @@ -278,4 +280,25 @@ object CoreUtils extends Logging { val listenerList = parseCsvList(listeners) listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => ep.protocolType -> ep).toMap } + + def generateUuidAsBase64(): String = { + val uuid = UUID.randomUUID() + urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid)) + } + + def getBytesFromUuid(uuid: UUID): Array[Byte] = { + // Extract bytes for uuid which is 128 bits (or 16 bytes) long. + val uuidBytes = ByteBuffer.wrap(new Array[Byte](16)) + uuidBytes.putLong(uuid.getMostSignificantBits) + uuidBytes.putLong(uuid.getLeastSignificantBits) + uuidBytes.array + } + + def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = { + val base64EncodedUUID = DatatypeConverter.printBase64Binary(data) + //Convert to URL safe variant by replacing + and / with - and _ respectively. + val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-").replace("/", "_") + // Remove the "==" padding at the end. + urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index a137da8..503ed54 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -41,6 +41,7 @@ import scala.collection._ object ZkUtils { val ConsumersPath = "/consumers" + val ClusterIdPath = "/cluster/id" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" val ControllerPath = "/controller" @@ -209,6 +210,35 @@ class ZkUtils(val zkClient: ZkClient, } } + /* Represents a cluster identifier. Stored in Zookeeper in JSON format: {"version" -> "1", "id" -> id } */ + object ClusterId { + + def toJson(id: String) = { + val jsonMap = Map("version" -> "1", "id" -> id) + Json.encode(jsonMap) + } + + def fromJson(clusterIdJson: String): String = { + Json.parseFull(clusterIdJson).map { m => + val clusterIdMap = m.asInstanceOf[Map[String, Any]] + clusterIdMap.get("id").get.asInstanceOf[String] + }.getOrElse(throw new KafkaException(s"Failed to parse the cluster id json $clusterIdJson")) + } + } + + def getClusterId: Option[String] = + readDataMaybeNull(ClusterIdPath)._1.map(ClusterId.fromJson) + + def createOrGetClusterId(proposedClusterId: String): String = { + try { + createPersistentPath(ClusterIdPath, ClusterId.toJson(proposedClusterId)) + proposedClusterId + } catch { + case e: ZkNodeExistsException => + getClusterId.getOrElse(throw new KafkaException("Failed to get cluster id from Zookeeper. This can only happen if /cluster/id is deleted from Zookeeper.")) + } + } + def getSortedBrokerList(): Seq[Int] = getChildren(BrokerIdsPath).map(_.toInt).sorted http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala new file mode 100644 index 0000000..9e03e27 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.util.concurrent.ExecutionException +import java.util.concurrent.atomic.AtomicReference +import java.util.{Properties} + +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.server._ +import kafka.utils._ +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition} +import org.apache.kafka.test.{TestUtils => _, _} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.collection.JavaConverters._ +import org.apache.kafka.test.TestUtils.isValidClusterId + +import scala.collection.mutable.ArrayBuffer + +/** The test cases here verify the following conditions. + * 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called. + * 2. The Serializer receives the cluster id before the serialize() method is called. + * 3. The producer MetricReporter receives the cluster id after send() method is called on KafkaProducer. + * 4. The ConsumerInterceptor receives the cluster id before the onConsume() method. + * 5. The Deserializer receives the cluster id before the deserialize() method is called. + * 6. The consumer MetricReporter receives the cluster id after poll() is called on KafkaConsumer. + * 7. The broker MetricReporter receives the cluster id after the broker startup is over. + * 8. The broker KafkaMetricReporter receives the cluster id after the broker startup is over. + * 9. All the components receive the same cluster id. + */ + +object EndToEndClusterIdTest { + + object MockConsumerMetricsReporter { + val CLUSTER_META = new AtomicReference[ClusterResource] + } + + class MockConsumerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { + + override def onUpdate(clusterMetadata: ClusterResource) { + MockConsumerMetricsReporter.CLUSTER_META.set(clusterMetadata) + } + } + + object MockProducerMetricsReporter { + val CLUSTER_META = new AtomicReference[ClusterResource] + } + + class MockProducerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { + + override def onUpdate(clusterMetadata: ClusterResource) { + MockProducerMetricsReporter.CLUSTER_META.set(clusterMetadata) + } + } + + object MockBrokerMetricsReporter { + val CLUSTER_META = new AtomicReference[ClusterResource] + } + + class MockBrokerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { + + override def onUpdate(clusterMetadata: ClusterResource) { + MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata) + } + } +} + +class EndToEndClusterIdTest extends KafkaServerTestHarness { + + import EndToEndClusterIdTest._ + + val producerCount = 1 + val consumerCount = 1 + val serverCount = 1 + lazy val producerConfig = new Properties + lazy val consumerConfig = new Properties + lazy val serverConfig = new Properties + val numRecords = 1 + val topic = "e2etopic" + val part = 0 + val tp = new TopicPartition(topic, part) + val topicAndPartition = new TopicAndPartition(topic, part) + this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter") + + override def generateConfigs() = { + val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile, saslProperties = saslProperties) + cfgs.foreach(_.putAll(serverConfig)) + cfgs.map(KafkaConfig.fromProps) + } + + @Before + override def setUp() { + super.setUp + // create the consumer offset topic + TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers) + } + + @Test + def testEndToEnd() { + val appendStr = "mock" + MockConsumerInterceptor.resetCounters() + MockProducerInterceptor.resetCounters() + + assertNotNull(MockBrokerMetricsReporter.CLUSTER_META) + isValidClusterId(MockBrokerMetricsReporter.CLUSTER_META.get.clusterId) + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor") + producerProps.put("mock.interceptor.append", appendStr) + producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "kafka.api.EndToEndClusterIdTest$MockProducerMetricsReporter") + val testProducer = new KafkaProducer(producerProps, new MockSerializer, new MockSerializer) + + // Send one record and make sure clusterId is set after send and before onAcknowledgement + sendRecords(testProducer, 1, tp) + assertNotEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT, MockProducerInterceptor.NO_CLUSTER_ID) + assertNotNull(MockProducerInterceptor.CLUSTER_META) + assertEquals(MockProducerInterceptor.CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.get.clusterId, MockProducerInterceptor.CLUSTER_META.get.clusterId) + isValidClusterId(MockProducerInterceptor.CLUSTER_META.get.clusterId) + + // Make sure that serializer gets the cluster id before serialize method. + assertNotEquals(MockSerializer.CLUSTER_ID_BEFORE_SERIALIZE, MockSerializer.NO_CLUSTER_ID) + assertNotNull(MockSerializer.CLUSTER_META) + isValidClusterId(MockSerializer.CLUSTER_META.get.clusterId) + + assertNotNull(MockProducerMetricsReporter.CLUSTER_META) + isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId) + + this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor") + this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "kafka.api.EndToEndClusterIdTest$MockConsumerMetricsReporter") + val testConsumer = new KafkaConsumer(this.consumerConfig, new MockDeserializer, new MockDeserializer) + testConsumer.assign(List(tp).asJava) + testConsumer.seek(tp, 0) + + // consume and verify that values are modified by interceptors + consumeRecords(testConsumer, numRecords) + + // Check that cluster id is present after the first poll call. + assertNotEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME, MockConsumerInterceptor.NO_CLUSTER_ID) + assertNotNull(MockConsumerInterceptor.CLUSTER_META) + isValidClusterId(MockConsumerInterceptor.CLUSTER_META.get.clusterId) + assertEquals(MockConsumerInterceptor.CLUSTER_ID_BEFORE_ON_CONSUME.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId) + + assertNotEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE, MockDeserializer.NO_CLUSTER_ID) + assertNotNull(MockDeserializer.CLUSTER_META) + isValidClusterId(MockDeserializer.CLUSTER_META.get.clusterId) + assertEquals(MockDeserializer.CLUSTER_ID_BEFORE_DESERIALIZE.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId) + + assertNotNull(MockConsumerMetricsReporter.CLUSTER_META) + isValidClusterId(MockConsumerMetricsReporter.CLUSTER_META.get.clusterId) + + // Make sure everyone receives the same cluster id. + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockSerializer.CLUSTER_META.get.clusterId) + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockProducerMetricsReporter.CLUSTER_META.get.clusterId) + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerInterceptor.CLUSTER_META.get.clusterId) + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockDeserializer.CLUSTER_META.get.clusterId) + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockConsumerMetricsReporter.CLUSTER_META.get.clusterId) + assertEquals(MockProducerInterceptor.CLUSTER_META.get.clusterId, MockBrokerMetricsReporter.CLUSTER_META.get.clusterId) + + testConsumer.close() + testProducer.close() + MockConsumerInterceptor.resetCounters() + MockProducerInterceptor.resetCounters() + } + + private def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], numRecords: Int, tp: TopicPartition) { + val futures = (0 until numRecords).map { i => + val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) + debug(s"Sending this record: $record") + producer.send(record) + } + try { + futures.foreach(_.get) + } catch { + case e: ExecutionException => throw e.getCause + } + } + + private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + numRecords: Int = 1, + startingOffset: Int = 0, + topic: String = topic, + part: Int = part) { + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() + val maxIters = numRecords * 50 + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records += record + } + if (iters > maxIters) + throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + for (i <- 0 until numRecords) { + val record = records(i) + val offset = startingOffset + i + assertEquals(topic, record.topic) + assertEquals(part, record.partition) + assertEquals(offset.toLong, record.offset) + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 243f913..6c10632 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -675,6 +675,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { @Test def testInterceptors() { val appendStr = "mock" + MockConsumerInterceptor.resetCounters() + MockProducerInterceptor.resetCounters() + // create producer with interceptor val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 1980e8a..d82ec58 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -18,8 +18,9 @@ package kafka.metrics import java.util.Properties + import com.yammer.metrics.Metrics -import com.yammer.metrics.core.MetricPredicate +import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate} import org.junit.{After, Test} import org.junit.Assert._ import kafka.integration.KafkaServerTestHarness @@ -28,6 +29,7 @@ import kafka.serializer._ import kafka.utils._ import kafka.admin.AdminUtils import kafka.utils.TestUtils._ + import scala.collection._ import scala.collection.JavaConversions._ import scala.util.matching.Regex @@ -90,6 +92,13 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic)) } + @Test + def testClusterIdMetric(): Unit ={ + // Check if clusterId metric exists. + val metrics = Metrics.defaultRegistry().allMetrics + assertEquals(metrics.keySet.filter(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")).size, 1) + } + @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { sendMessages(servers, topic, nMessages) http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala new file mode 100755 index 0000000..d235d02 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.atomic.AtomicReference + +import kafka.metrics.KafkaMetricsReporter +import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.{ClusterResource, ClusterResourceListener} +import org.apache.kafka.test.MockMetricsReporter +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.apache.kafka.test.TestUtils.isValidClusterId + +object KafkaMetricReporterClusterIdTest { + + class MockKafkaMetricsReporter extends KafkaMetricsReporter with ClusterResourceListener { + + override def onUpdate(clusterMetadata: ClusterResource): Unit = { + MockKafkaMetricsReporter.CLUSTER_META.set(clusterMetadata) + } + + override def init(props: VerifiableProperties): Unit = { + } + } + + object MockKafkaMetricsReporter { + val CLUSTER_META = new AtomicReference[ClusterResource] + } + + object MockBrokerMetricsReporter { + val CLUSTER_META: AtomicReference[ClusterResource] = new AtomicReference[ClusterResource] + } + + class MockBrokerMetricsReporter extends MockMetricsReporter with ClusterResourceListener { + + override def onUpdate(clusterMetadata: ClusterResource) { + MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata) + } + } + +} + +class KafkaMetricReporterClusterIdTest extends ZooKeeperTestHarness { + var server: KafkaServerStartable = null + var config: KafkaConfig = null + + @Before + override def setUp() { + super.setUp() + val props = TestUtils.createBrokerConfig(1, zkConnect) + props.setProperty("kafka.metrics.reporters", "kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter") + props.setProperty(KafkaConfig.MetricReporterClassesProp, "kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter") + config = KafkaConfig.fromProps(props) + server = KafkaServerStartable.fromProps(props) + server.startup() + } + + @Test + def testClusterIdPresent() { + assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META) + isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId()) + + assertNotNull(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META) + isValidClusterId(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId()) + + assertEquals(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId(), + KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId()) + } + + @After + override def tearDown() { + server.shutdown() + CoreUtils.delete(config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + super.tearDown() + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 46a79de..11dd6fe 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -17,7 +17,7 @@ package kafka.server -import java.util.Properties +import java.util.{Properties} import kafka.common.Topic import kafka.utils.TestUtils @@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} import org.junit.Assert._ import org.junit.Test - +import org.apache.kafka.test.TestUtils.isValidClusterId import scala.collection.JavaConverters._ class MetadataRequestTest extends BaseRequestTest { @@ -35,6 +35,19 @@ class MetadataRequestTest extends BaseRequestTest { } @Test + def testClusterIdWithRequestVersion1() { + val v1MetadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 1) + val v1ClusterId = v1MetadataResponse.clusterId + assertNull(s"v1 clusterId should be null", v1ClusterId) + } + + @Test + def testClusterIdIsValid() { + val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 2) + isValidClusterId(metadataResponse.clusterId) + } + + @Test def testControllerId() { val controllerServer = servers.find(_.kafkaController.isActive()).get val controllerId = controllerServer.config.brokerId http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala new file mode 100755 index 0000000..325889f --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateClusterIdTest.scala @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import scala.concurrent._ +import ExecutionContext.Implicits._ +import scala.concurrent.duration._ +import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.junit.Assert._ +import org.junit.{Before, Test} +import org.apache.kafka.test.TestUtils.isValidClusterId + +class ServerGenerateClusterIdTest extends ZooKeeperTestHarness { + var config1: KafkaConfig = null + var config2: KafkaConfig = null + var config3: KafkaConfig = null + + @Before + override def setUp() { + super.setUp() + config1 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, zkConnect)) + config2 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(2, zkConnect)) + config3 = KafkaConfig.fromProps(TestUtils.createBrokerConfig(3, zkConnect)) + } + + @Test + def testAutoGenerateClusterId() { + // Make sure that the cluster id doesn't exist yet. + assertFalse(zkUtils.pathExists(ZkUtils.ClusterIdPath)) + + var server1 = TestUtils.createServer(config1) + + // Validate the cluster id + val clusterIdOnFirstBoot = server1.clusterId + isValidClusterId(clusterIdOnFirstBoot) + + server1.shutdown() + + // Make sure that the cluster id is persistent. + assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath)) + assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot)) + + // Restart the server check to confirm that it uses the clusterId generated previously + server1 = new KafkaServer(config1) + server1.startup() + + val clusterIdOnSecondBoot = server1.clusterId + assertEquals(clusterIdOnFirstBoot, clusterIdOnSecondBoot) + + server1.shutdown() + + // Make sure that the cluster id is persistent after multiple reboots. + assertTrue(zkUtils.pathExists(ZkUtils.ClusterIdPath)) + assertEquals(zkUtils.getClusterId, Some(clusterIdOnFirstBoot)) + + CoreUtils.delete(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + } + + @Test + def testAutoGenerateClusterIdForKafkaClusterSequential() { + val server1 = TestUtils.createServer(config1) + val clusterIdFromServer1 = server1.clusterId + + val server2 = TestUtils.createServer(config2) + val clusterIdFromServer2 = server2.clusterId + + val server3 = TestUtils.createServer(config3) + val clusterIdFromServer3 = server3.clusterId + + server1.shutdown() + server2.shutdown() + server3.shutdown() + + isValidClusterId(clusterIdFromServer1) + assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3) + + // Check again after reboot + server1.startup() + assertEquals(clusterIdFromServer1, server1.clusterId) + server2.startup() + assertEquals(clusterIdFromServer2, server2.clusterId) + server3.startup() + assertEquals(clusterIdFromServer3, server3.clusterId) + server1.shutdown() + server2.shutdown() + server3.shutdown() + + CoreUtils.delete(server1.config.logDirs) + CoreUtils.delete(server2.config.logDirs) + CoreUtils.delete(server3.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + } + + @Test + def testAutoGenerateClusterIdForKafkaClusterParallel() { + val firstBoot = Future.traverse(Seq(config1, config2, config3))(config => Future(TestUtils.createServer(config))) + val Seq(server1, server2, server3) = Await.result(firstBoot, 100 second) + + val clusterIdFromServer1 = server1.clusterId + val clusterIdFromServer2 = server2.clusterId + val clusterIdFromServer3 = server3.clusterId + + server1.shutdown() + server2.shutdown() + server3.shutdown() + isValidClusterId(clusterIdFromServer1) + assertEquals(clusterIdFromServer1, clusterIdFromServer2, clusterIdFromServer3) + + // Check again after reboot + val secondBoot = Future.traverse(Seq(server1, server2, server3))(server => Future { + server.startup() + server + }) + val servers = Await.result(secondBoot, 100 second) + servers.foreach(server => assertEquals(clusterIdFromServer1, server.clusterId)) + + servers.foreach(_.shutdown()) + CoreUtils.delete(server1.config.logDirs) + CoreUtils.delete(server2.config.logDirs) + CoreUtils.delete(server3.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/utils/UtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index f42f0ff..22a0a16 100755 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -17,9 +17,11 @@ package kafka.utils -import java.util.Arrays +import java.util.{Arrays, UUID} import java.util.concurrent.locks.ReentrantLock import java.nio.ByteBuffer +import java.util.regex.Pattern + import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Assert._ @@ -31,6 +33,7 @@ import org.apache.kafka.common.utils.Utils class UtilsTest extends JUnitSuite { private val logger = Logger.getLogger(classOf[UtilsTest]) + val clusterIdPattern = Pattern.compile("[a-zA-Z0-9_\\-]+") @Test def testSwallow() { @@ -139,7 +142,6 @@ class UtilsTest extends JUnitSuite { } } - @Test def testInLock() { val lock = new ReentrantLock() @@ -151,4 +153,26 @@ class UtilsTest extends JUnitSuite { assertFalse("Should be unlocked", lock.isLocked) } + @Test + def testUrlSafeBase64EncodeUUID() { + + // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==] + val clusterId1 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46"))) + assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg") + assertEquals(clusterId1.length, 22) + assertTrue(clusterIdPattern.matcher(clusterId1).matches()) + + // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==] + val clusterId2 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("d418ec02-277e-4853-81e6-afe30259daec"))) + assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A") + assertEquals(clusterId2.length, 22) + assertTrue(clusterIdPattern.matcher(clusterId2).matches()) + } + + @Test + def testGenerateUuidAsBase64() { + val clusterId = CoreUtils.generateUuidAsBase64() + assertEquals(clusterId.length, 22) + assertTrue(clusterIdPattern.matcher(clusterId).matches()) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala index 2d81ed9..918c4b5 100755 --- a/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/ZkUtilsTest.scala @@ -52,4 +52,10 @@ class ZkUtilsTest extends ZooKeeperTestHarness { val (optionalData, _) = zkUtils.readDataMaybeNull(path) assertTrue("Node should still be there", optionalData.isDefined) } + + @Test + def testClusterIdentifierJsonParsing() { + val clusterId = "test" + assertEquals(zkUtils.ClusterId.fromJson(zkUtils.ClusterId.toJson(clusterId)), clusterId) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index ba6289c..cd45aee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -49,8 +49,8 @@ public class WindowedStreamPartitionerTest { new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(), - Collections.<String>emptySet()); + private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, + Collections.<String>emptySet(), Collections.<String>emptySet()); @Test public void testCopartitioning() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index 9683da9..e36bde4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -44,7 +44,7 @@ public class DefaultPartitionGrouperTest { new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(), + private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(), Collections.<String>emptySet()); @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 8d5a549..922ddb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -47,7 +47,7 @@ public class RecordCollectorTest { new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, + private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(), Collections.<String>emptySet()); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index e5ae7d8..4f4d2eb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -82,7 +82,7 @@ public class StreamPartitionAssignorTest { new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(), + private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(), Collections.<String>emptySet()); private final TaskId task0 = new TaskId(0, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index ccbf8d6..c7e9daa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -81,7 +81,7 @@ public class StreamThreadTest { new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(), + private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet(), Collections.<String>emptySet()); private final PartitionAssignor.Subscription subscription = http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/tests/kafkatest/tests/core/upgrade_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 16a518d..9c83991 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -15,6 +15,8 @@ from ducktape.mark import parametrize +import json + from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import config_property @@ -22,7 +24,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, TRUNK, KafkaVersion class TestUpgrade(ProduceConsumeValidateTest): @@ -60,6 +62,7 @@ class TestUpgrade(ProduceConsumeValidateTest): self.kafka.start_node(node) + @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], new_consumer=True, security_protocol="SASL_SSL") @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"], new_consumer=True) @@ -73,9 +76,9 @@ class TestUpgrade(ProduceConsumeValidateTest): @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"]) def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, new_consumer=False, security_protocol="PLAINTEXT"): - """Test upgrade of Kafka broker cluster from 0.8.2 or 0.9.0 to 0.10 + """Test upgrade of Kafka broker cluster from 0.8.2, 0.9.0 or 0.10.0 to the current version - from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X or 0.9 + from_kafka_version is a Kafka version to upgrade from: either 0.8.2.X, 0.9.0.x or 0.10.0.x If to_message_format_version is None, it means that we will upgrade to default (latest) message format version. It is possible to upgrade to 0.10 brokers but still use message @@ -105,6 +108,8 @@ class TestUpgrade(ProduceConsumeValidateTest): compression_types=compression_types, version=KafkaVersion(from_kafka_version)) + assert self.zk.query("/cluster/id") is None + # TODO - reduce the timeout self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=30000, new_consumer=new_consumer, @@ -112,3 +117,13 @@ class TestUpgrade(ProduceConsumeValidateTest): self.run_produce_consume_validate(core_test_action=lambda: self.perform_upgrade(from_kafka_version, to_message_format_version)) + + cluster_id_json = self.zk.query("/cluster/id") + assert cluster_id_json is not None + try: + cluster_id = json.loads(cluster_id_json) + except : + self.logger.debug("Data in /cluster/id znode could not be parsed. Data = %s" % cluster_id_json) + + self.logger.debug("Cluster id [%s]", cluster_id) + assert len(cluster_id["id"]) == 22 http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/tests/kafkatest/version.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 6b378e7..239a9f4 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -77,4 +77,8 @@ LATEST_0_9 = V_0_9_0_1 # 0.10.0.X versions V_0_10_0_0 = KafkaVersion("0.10.0.0") -LATEST_0_10 = V_0_10_0_0 +V_0_10_0_1 = KafkaVersion("0.10.0.1") +# Adding 0.10.0 as the next version will be 0.10.1.x +LATEST_0_10_0 = V_0_10_0_1 + +LATEST_0_10 = LATEST_0_10_0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/vagrant/base.sh ---------------------------------------------------------------------- diff --git a/vagrant/base.sh b/vagrant/base.sh index ebe54a8..3697765 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -65,6 +65,9 @@ get_kafka 0.8.2.2 chmod a+rw /opt/kafka-0.8.2.2 get_kafka 0.9.0.1 chmod a+rw /opt/kafka-0.9.0.1 +get_kafka 0.10.0.1 +chmod a+rw /opt/kafka-0.10.0.1 + # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use