This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2bfb239 KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188) 2bfb239 is described below commit 2bfb239495c97e57ee51f2f462f5d09160a0d67b Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Tue Jun 12 13:16:24 2018 +0530 KAFKA-7029: Update ReplicaVerificationTool not to use SimpleConsumer (#5188) We need to send fetch requests to replicas so we have to use NetworkClient instead of KafkaConsumer. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/server/ReplicaFetcherBlockingSend.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 389 +++++++++++++-------- .../kafka/tools/ReplicaVerificationToolTest.scala | 22 +- 3 files changed, 255 insertions(+), 158 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala index 0bf2bd3..4c7adfb 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala @@ -86,7 +86,7 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, ) } - override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { + override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { try { if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 0408e92..b1e6946 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -17,25 +17,35 @@ package kafka.tools +import java.net.SocketTimeoutException import java.text.SimpleDateFormat -import java.util.Date +import java.util import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.regex.{Pattern, PatternSyntaxException} +import java.util.{Date, Properties} import joptsimple.OptionParser import kafka.api._ -import kafka.client.ClientUtils -import kafka.cluster.BrokerEndPoint -import kafka.common.TopicAndPartition -import kafka.consumer.{ConsumerConfig, SimpleConsumer, Whitelist} -import kafka.message.{ByteBufferMessageSet, MessageSet} +import kafka.consumer.Whitelist import kafka.utils._ -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.utils.Time +import org.apache.kafka.clients._ +import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription} +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.{NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests.AbstractRequest.Builder +import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, FetchRequest => JFetchRequest} +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.{Node, TopicPartition} + +import scala.collection.JavaConverters._ /** - * For verifying the consistency among replicas. + * For verifying the consistency among replicas. * * 1. start a fetcher on every broker. * 2. each fetcher does the following @@ -44,11 +54,11 @@ import org.apache.kafka.common.utils.Time * 2.3 waits for all other fetchers to finish step 2.2 * 2.4 one of the fetchers verifies the consistency of fetched results among replicas * - * The consistency verification is up to the high watermark. The tool reports the - * max lag between the verified offset and the high watermark among all partitions. + * The consistency verification is up to the high watermark. The tool reports the + * max lag between the verified offset and the high watermark among all partitions. * - * If a broker goes down, the verification of the partitions on that broker is delayed - * until the broker is up again. + * If a broker goes down, the verification of the partitions on that broker is delayed + * until the broker is up again. * * Caveats: * 1. The tools needs all brokers to be up at startup time. @@ -56,7 +66,7 @@ import org.apache.kafka.common.utils.Time */ object ReplicaVerificationTool extends Logging { - val clientId= "replicaVerificationTool" + val clientId = "replicaVerificationTool" val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" val dateFormat = new SimpleDateFormat(dateFormatString) @@ -74,7 +84,7 @@ object ReplicaVerificationTool extends Logging { .withRequiredArg .describedAs("bytes") .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.FetchSize) + .defaultsTo(ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES) val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") .withRequiredArg .describedAs("ms") @@ -96,18 +106,16 @@ object ReplicaVerificationTool extends Logging { .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - if(args.length == 0) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") - val options = parser.parse(args : _*) + val options = parser.parse(args: _*) CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt) val regex = options.valueOf(topicWhiteListOpt) val topicWhiteListFiler = new Whitelist(regex) - try { - Pattern.compile(regex) - } + try Pattern.compile(regex) catch { case _: PatternSyntaxException => throw new RuntimeException(regex + " is an invalid regex.") @@ -120,68 +128,68 @@ object ReplicaVerificationTool extends Logging { // getting topic metadata info("Getting topic metadata...") val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) - val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) - val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) - val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap - val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( - topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) - true - else - false - ) + ToolsUtils.validatePortOrDie(parser, brokerList) + + val (topicsMetadata, brokerInfo) = { + val adminClient = createAdminClient(brokerList) + try ((listTopicsMetadata(adminClient), brokerDetails(adminClient))) + finally CoreUtils.swallow(adminClient.close(), this) + } + + val filteredTopicMetadata = topicsMetadata.filter { topicMetaData => + topicWhiteListFiler.isTopicAllowed(topicMetaData.name, excludeInternalTopics = false) + } if (filteredTopicMetadata.isEmpty) { - error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.") + error(s"No topics found. $topicWhiteListOpt if specified, is either filtering out all topics or there is no topic.") Exit.exit(1) } - val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap( - topicMetadataResponse => - topicMetadataResponse.partitionsMetadata.flatMap( - partitionMetadata => - partitionMetadata.replicas.map(broker => - TopicPartitionReplica(topic = topicMetadataResponse.topic, partitionId = partitionMetadata.partitionId, replicaId = broker.id)) - ) - ) - debug("Selected topic partitions: " + topicPartitionReplicaList) - val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_.replicaId) - .map { case (brokerId, partitions) => - brokerId -> partitions.map { partition => TopicAndPartition(partition.topic, partition.partitionId) } } - debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) - val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = - topicPartitionReplicaList.groupBy(replica => TopicAndPartition(replica.topic, replica.partitionId)) - .map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } - debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) - val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap { topicMetadataResponse => - topicMetadataResponse.partitionsMetadata.map { partitionMetadata => - (TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id) + val topicPartitionReplicas = filteredTopicMetadata.flatMap { topicMetadata => + topicMetadata.partitions.asScala.flatMap { partitionMetadata => + partitionMetadata.replicas.asScala.map { node => + TopicPartitionReplica(topic = topicMetadata.name, partitionId = partitionMetadata.partition, replicaId = node.id) + } + } + } + debug(s"Selected topic partitions: $topicPartitionReplicas") + val brokerToTopicPartitions = topicPartitionReplicas.groupBy(_.replicaId).map { case (brokerId, partitions) => + brokerId -> partitions.map { partition => new TopicPartition(partition.topic, partition.partitionId) } + } + debug(s"Topic partitions per broker: $brokerToTopicPartitions") + val expectedReplicasPerTopicPartition = topicPartitionReplicas.groupBy { replica => + new TopicPartition(replica.topic, replica.partitionId) + }.map { case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size } + debug(s"Expected replicas per topic partition: $expectedReplicasPerTopicPartition") + + val topicPartitions = filteredTopicMetadata.flatMap { topicMetaData => + topicMetaData.partitions.asScala.map { partitionMetadata => + new TopicPartition(topicMetaData.name, partitionMetadata.partition) } - }.groupBy(_._2).mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map { case (topicAndPartition, _) => - topicAndPartition - }) - debug("Leaders per broker: " + leadersPerBroker) - - val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, - leadersPerBroker, - topicAndPartitionsPerBroker.size, - brokerMap, - initialOffsetTime, - reportInterval) + } + + val consumerProps = consumerConfig(brokerList) + + val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicPartition, + initialOffsets(topicPartitions, consumerProps, initialOffsetTime), + brokerToTopicPartitions.size, + reportInterval) // create all replica fetcher threads - val verificationBrokerId = topicAndPartitionsPerBroker.head._1 - val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map { - case (brokerId, topicAndPartitions) => - new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, - sourceBroker = brokerMap(brokerId), - topicAndPartitions = topicAndPartitions, - replicaBuffer = replicaBuffer, - socketTimeout = 30000, - socketBufferSize = 256000, - fetchSize = fetchSize, - maxWait = maxWaitMs, - minBytes = 1, - doVerification = brokerId == verificationBrokerId) + val verificationBrokerId = brokerToTopicPartitions.head._1 + val counter = new AtomicInteger(0) + val fetcherThreads: Iterable[ReplicaFetcher] = brokerToTopicPartitions.map { case (brokerId, topicPartitions) => + new ReplicaFetcher(name = s"ReplicaFetcher-$brokerId", + sourceBroker = brokerInfo(brokerId), + topicPartitions = topicPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = brokerId == verificationBrokerId, + consumerProps, + fetcherId = counter.incrementAndGet()) } Runtime.getRuntime.addShutdownHook(new Thread() { @@ -194,87 +202,112 @@ object ReplicaVerificationTool extends Logging { println(ReplicaVerificationTool.getCurrentTimeString() + ": verification process is started.") } + + private def listTopicsMetadata(adminClient: admin.AdminClient): Seq[TopicDescription] = { + val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get + adminClient.describeTopics(topics).all.get.values.asScala.toBuffer + } + + private def brokerDetails(adminClient: admin.AdminClient): Map[Int, Node] = { + adminClient.describeCluster.nodes.get.asScala.map(n => (n.id, n)).toMap + } + + private def createAdminClient(brokerUrl: String): admin.AdminClient = { + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + admin.AdminClient.create(props) + } + + private def initialOffsets(topicPartitions: Seq[TopicPartition], consumerConfig: Properties, + initialOffsetTime: Long): Map[TopicPartition, Long] = { + val consumer = createConsumer(consumerConfig) + try { + if (ListOffsetRequest.LATEST_TIMESTAMP == initialOffsetTime) + consumer.endOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap + else if (ListOffsetRequest.EARLIEST_TIMESTAMP == initialOffsetTime) + consumer.beginningOffsets(topicPartitions.asJava).asScala.mapValues(_.longValue).toMap + else { + val timestampsToSearch = topicPartitions.map(tp => tp -> (initialOffsetTime: java.lang.Long)).toMap + consumer.offsetsForTimes(timestampsToSearch.asJava).asScala.mapValues(v => v.offset).toMap + } + } finally consumer.close() + } + + private def consumerConfig(brokerUrl: String): Properties = { + val properties = new Properties() + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ReplicaVerification") + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + properties + } + + private def createConsumer(consumerConfig: Properties): KafkaConsumer[String, String] = + new KafkaConsumer(consumerConfig) } -private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) +private case class TopicPartitionReplica(topic: String, partitionId: Int, replicaId: Int) private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) -private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], - leadersPerBroker: Map[Int, Seq[TopicAndPartition]], +private class ReplicaBuffer(expectedReplicasPerTopicPartition: Map[TopicPartition, Int], + initialOffsets: Map[TopicPartition, Long], expectedNumFetchers: Int, - brokerMap: Map[Int, BrokerEndPoint], - initialOffsetTime: Long, reportInterval: Long) extends Logging { - private val fetchOffsetMap = new Pool[TopicAndPartition, Long] - private val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] + private val fetchOffsetMap = new Pool[TopicPartition, Long] + private val recordsCache = new Pool[TopicPartition, Pool[Int, FetchResponse.PartitionData[MemoryRecords]]] private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) private val verificationBarrier = new AtomicReference(new CountDownLatch(1)) @volatile private var lastReportTime = Time.SYSTEM.milliseconds private var maxLag: Long = -1L private var offsetWithMaxLag: Long = -1L - private var maxLagTopicAndPartition: TopicAndPartition = null + private var maxLagTopicAndPartition: TopicPartition = null initialize() def createNewFetcherBarrier() { fetcherBarrier.set(new CountDownLatch(expectedNumFetchers)) } - def getFetcherBarrier() = fetcherBarrier.get() + def getFetcherBarrier() = fetcherBarrier.get def createNewVerificationBarrier() { verificationBarrier.set(new CountDownLatch(1)) } - def getVerificationBarrier() = verificationBarrier.get() + def getVerificationBarrier() = verificationBarrier.get private def initialize() { - for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet) - messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData]) + for (topicPartition <- expectedReplicasPerTopicPartition.keySet) + recordsCache.put(topicPartition, new Pool[Int, FetchResponse.PartitionData[MemoryRecords]]) setInitialOffsets() } - private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = { - offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) => - partitionOffsetsResponse.error != Errors.NONE - }.mkString - } private def setInitialOffsets() { - for ((brokerId, topicAndPartitions) <- leadersPerBroker) { - val broker = brokerMap(brokerId) - val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId) - val initialOffsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] = - topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap - val offsetRequest = OffsetRequest(initialOffsetMap) - val offsetResponse = consumer.getOffsetsBefore(offsetRequest) - assert(!offsetResponse.hasError, offsetResponseStringWithError(offsetResponse)) - offsetResponse.partitionErrorAndOffsets.foreach { case (topicAndPartition, partitionOffsetResponse) => - fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) - } - } + for ((tp, offset) <- initialOffsets) + fetchOffsetMap.put(tp, offset) } - def addFetchedData(topicAndPartition: TopicAndPartition, replicaId: Int, partitionData: FetchResponsePartitionData) { - messageSetCache.get(topicAndPartition).put(replicaId, partitionData) + def addFetchedData(topicAndPartition: TopicPartition, replicaId: Int, partitionData: FetchResponse.PartitionData[MemoryRecords]) { + recordsCache.get(topicAndPartition).put(replicaId, partitionData) } - def getOffset(topicAndPartition: TopicAndPartition) = { + def getOffset(topicAndPartition: TopicPartition) = { fetchOffsetMap.get(topicAndPartition) } def verifyCheckSum(println: String => Unit) { debug("Begin verification") maxLag = -1L - for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { - debug("Verifying " + topicAndPartition) - assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition), - "fetched " + fetchResponsePerReplica.size + " replicas for " + topicAndPartition + ", but expected " - + expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas") + for ((topicPartition, fetchResponsePerReplica) <- recordsCache) { + debug("Verifying " + topicPartition) + assert(fetchResponsePerReplica.size == expectedReplicasPerTopicPartition(topicPartition), + "fetched " + fetchResponsePerReplica.size + " replicas for " + topicPartition + ", but expected " + + expectedReplicasPerTopicPartition(topicPartition) + " replicas") val recordBatchIteratorMap = fetchResponsePerReplica.map { case (replicaId, fetchResponse) => - replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].asRecords.batches.iterator + replicaId -> fetchResponse.records.batches.iterator } - val maxHw = fetchResponsePerReplica.values.map(_.hw).max + val maxHw = fetchResponsePerReplica.values.map(_.highWatermark).max // Iterate one message at a time from every replica, until high watermark is reached. var isMessageInAllReplicas = true @@ -286,7 +319,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa val batch = recordBatchIterator.next() // only verify up to the high watermark - if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).hw) + if (batch.lastOffset >= fetchResponsePerReplica.get(replicaId).highWatermark) isMessageInAllReplicas = false else { messageInfoFromFirstReplicaOpt match { @@ -295,7 +328,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa MessageInfo(replicaId, batch.lastOffset, batch.nextOffset, batch.checksum)) case Some(messageInfoFromFirstReplica) => if (messageInfoFromFirstReplica.offset != batch.lastOffset) { - println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicAndPartition + println(ReplicaVerificationTool.getCurrentTimeString + ": partition " + topicPartition + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset " + messageInfoFromFirstReplica.offset + " doesn't match replica " + replicaId + "'s offset " + batch.lastOffset) @@ -303,7 +336,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } if (messageInfoFromFirstReplica.checksum != batch.checksum) println(ReplicaVerificationTool.getCurrentTimeString + ": partition " - + topicAndPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica " + + topicPartition + " has unmatched checksum at offset " + batch.lastOffset + "; replica " + messageInfoFromFirstReplica.replicaId + "'s checksum " + messageInfoFromFirstReplica.checksum + "; replica " + replicaId + "'s checksum " + batch.checksum) } @@ -313,20 +346,20 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa } catch { case t: Throwable => throw new RuntimeException("Error in processing replica %d in partition %s at offset %d." - .format(replicaId, topicAndPartition, fetchOffsetMap.get(topicAndPartition)), t) + .format(replicaId, topicPartition, fetchOffsetMap.get(topicPartition)), t) } } if (isMessageInAllReplicas) { val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset - fetchOffsetMap.put(topicAndPartition, nextOffset) - debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + - nextOffset + " for " + topicAndPartition) + fetchOffsetMap.put(topicPartition, nextOffset) + debug(expectedReplicasPerTopicPartition(topicPartition) + " replicas match at offset " + + nextOffset + " for " + topicPartition) } } - if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { - offsetWithMaxLag = fetchOffsetMap.get(topicAndPartition) + if (maxHw - fetchOffsetMap.get(topicPartition) > maxLag) { + offsetWithMaxLag = fetchOffsetMap.get(topicPartition) maxLag = maxHw - offsetWithMaxLag - maxLagTopicAndPartition = topicAndPartition + maxLagTopicAndPartition = topicPartition } fetchResponsePerReplica.clear() } @@ -334,51 +367,54 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa if (currentTimeMs - lastReportTime > reportInterval) { println(ReplicaVerificationTool.dateFormat.format(new Date(currentTimeMs)) + ": max lag is " + maxLag + " for partition " + maxLagTopicAndPartition + " at offset " + offsetWithMaxLag - + " among " + messageSetCache.size + " partitions") + + " among " + recordsCache.size + " partitions") lastReportTime = currentTimeMs } } } -private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAndPartitions: Iterable[TopicAndPartition], +private class ReplicaFetcher(name: String, sourceBroker: Node, topicPartitions: Iterable[TopicPartition], replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, - fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean, consumerConfig: Properties, + fetcherId: Int) extends ShutdownableThread(name) { - val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) - val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(ReplicaVerificationTool.clientId). - replicaId(Request.DebuggingConsumerId). - maxWait(maxWait). - minBytes(minBytes) + + private val fetchEndpoint = new ReplicaFetcherBlockingSend(sourceBroker, new ConsumerConfig(consumerConfig), new Metrics(), Time.SYSTEM, fetcherId, + s"broker-${Request.DebuggingConsumerId}-fetcher-$fetcherId") override def doWork() { val fetcherBarrier = replicaBuffer.getFetcherBarrier() val verificationBarrier = replicaBuffer.getVerificationBarrier() - for (topicAndPartition <- topicAndPartitions) - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - replicaBuffer.getOffset(topicAndPartition), fetchSize) + val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] + for (topicPartition <- topicPartitions) + requestMap.put(topicPartition, new JFetchRequest.PartitionData(replicaBuffer.getOffset(topicPartition), 0L, fetchSize)) + + val fetchRequestBuilder = JFetchRequest.Builder. + forReplica(ApiKeys.FETCH.latestVersion, Request.DebuggingConsumerId, maxWait, minBytes, requestMap) - val fetchRequest = fetchRequestBuilder.build() - debug("Issuing fetch request " + fetchRequest) + debug("Issuing fetch request ") - var response: FetchResponse = null + var fetchResponse: FetchResponse[MemoryRecords] = null try { - response = simpleConsumer.fetch(fetchRequest) + val clientResponse = fetchEndpoint.sendRequest(fetchRequestBuilder) + fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[MemoryRecords]] } catch { case t: Throwable => if (!isRunning) throw t } - if (response != null) { - response.data.foreach { case (topicAndPartition, partitionData) => - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, partitionData) + if (fetchResponse != null) { + fetchResponse.responseData.asScala.foreach { case (tp, partitionData) => + replicaBuffer.addFetchedData(tp, sourceBroker.id, partitionData) } } else { - for (topicAndPartition <- topicAndPartitions) - replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty)) + val emptyResponse = new FetchResponse.PartitionData(Errors.NONE, FetchResponse.INVALID_HIGHWATERMARK, + FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) + for (topicAndPartition <- topicPartitions) + replicaBuffer.addFetchedData(topicAndPartition, sourceBroker.id, emptyResponse) } fetcherBarrier.countDown() @@ -402,3 +438,64 @@ private class ReplicaFetcher(name: String, sourceBroker: BrokerEndPoint, topicAn debug("Done verification") } } + +private class ReplicaFetcherBlockingSend(sourceNode: Node, + consumerConfig: ConsumerConfig, + metrics: Metrics, + time: Time, + fetcherId: Int, + clientId: String) { + + private val socketTimeout: Int = consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG) + + private val networkClient = { + val channelBuilder = org.apache.kafka.clients.ClientUtils.createChannelBuilder(consumerConfig) + val selector = new Selector( + NetworkReceive.UNLIMITED, + consumerConfig.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + metrics, + time, + "replica-fetcher", + Map("broker-id" -> sourceNode.id.toString, "fetcher-id" -> fetcherId.toString).asJava, + false, + channelBuilder, + new LogContext + ) + new NetworkClient( + selector, + new ManualMetadataUpdater(), + clientId, + 1, + 0, + 0, + Selectable.USE_DEFAULT_BUFFER_SIZE, + consumerConfig.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), + consumerConfig.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), + time, + false, + new ApiVersions, + new LogContext + ) + } + + def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { + try { + if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout)) + throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") + else { + val clientRequest = networkClient.newClientRequest(sourceNode.id.toString, requestBuilder, + time.milliseconds(), true) + NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) + } + } + catch { + case e: Throwable => + networkClient.close(sourceNode.id.toString) + throw e + } + } + + def close(): Unit = { + networkClient.close() + } +} diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala index 211413a..f69c909 100644 --- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala +++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala @@ -17,11 +17,10 @@ package kafka.tools -import kafka.api.FetchResponsePartitionData -import kafka.common.TopicAndPartition -import kafka.message.ByteBufferMessageSet +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.requests.FetchResponse import org.junit.Test import org.junit.Assert.assertTrue @@ -32,12 +31,12 @@ class ReplicaVerificationToolTest { val sb = new StringBuilder val expectedReplicasPerTopicAndPartition = Map( - TopicAndPartition("a", 0) -> 3, - TopicAndPartition("a", 1) -> 3, - TopicAndPartition("b", 0) -> 2 + new TopicPartition("a", 0) -> 3, + new TopicPartition("a", 1) -> 3, + new TopicPartition("b", 0) -> 2 ) - val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, Map.empty, 0, 0) + val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, Map.empty, 2, 0) expectedReplicasPerTopicAndPartition.foreach { case (tp, numReplicas) => (0 until numReplicas).foreach { replicaId => val records = (0 to 5).map { index => @@ -45,8 +44,9 @@ class ReplicaVerificationToolTest { } val initialOffset = 4 val memoryRecords = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, records: _*) - replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20, - new ByteBufferMessageSet(memoryRecords.buffer))) + val partitionData = new FetchResponse.PartitionData(Errors.NONE, 20, 20, 0L, null, memoryRecords) + + replicaBuffer.addFetchedData(tp, replicaId, partitionData) } } @@ -55,7 +55,7 @@ class ReplicaVerificationToolTest { // If you change this assertion, you should verify that the replica_verification_test.py system test still passes assertTrue(s"Max lag information should be in output: `$output`", - output.endsWith(": max lag is 10 for partition a-0 at offset 10 among 3 partitions")) + output.endsWith(": max lag is 10 for partition a-1 at offset 10 among 3 partitions")) } } -- To stop receiving notification emails like this one, please contact ij...@apache.org.