This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c74c0f2facd KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301) c74c0f2facd is described below commit c74c0f2facde2b392ab745144d6ad520575ab9ef Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Mar 10 10:17:14 2023 -0800 KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring (#13301) The Fetcher class is used internally by the KafkaConsumer to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored Fetcher. This task includes refactoring Fetcher by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic. Reviewers: Philip Nee <philip...@gmail.com>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/clients/consumer/KafkaConsumer.java | 4 +- .../clients/consumer/internals/CompletedFetch.java | 365 ++++++++++++++ .../consumer/internals/ConsumerMetrics.java | 4 +- .../consumer/internals/FetchMetricsAggregator.java | 95 ++++ .../consumer/internals/FetchMetricsManager.java | 203 ++++++++ ...ricsRegistry.java => FetchMetricsRegistry.java} | 8 +- .../kafka/clients/consumer/internals/Fetcher.java | 557 ++------------------- .../clients/consumer/internals/SensorBuilder.java | 115 +++++ .../consumer/internals/CompletedFetchTest.java | 304 +++++++++++ .../internals/FetchMetricsManagerTest.java | 171 +++++++ .../clients/consumer/internals/FetcherTest.java | 4 +- .../consumer/internals/OffsetFetcherTest.java | 2 +- 12 files changed, 1300 insertions(+), 532 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3f966121d69..11ac675cb4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetch; import org.apache.kafka.clients.consumer.internals.Fetcher; -import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; +import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry; import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.OffsetFetcher; @@ -737,7 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.metadata.bootstrap(addresses); String metricGrpPrefix = "consumer"; - FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); + FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); this.isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java new file mode 100644 index 00000000000..6a11b846810 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.CloseableIterator; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the + * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}. + * + * @param <K> Record key type + * @param <V> Record value type + */ +class CompletedFetch<K, V> { + + final TopicPartition partition; + final FetchResponseData.PartitionData partitionData; + final short requestVersion; + + long nextFetchOffset; + Optional<Integer> lastEpoch; + boolean isConsumed = false; + boolean initialized = false; + + private final Logger log; + private final SubscriptionState subscriptions; + private final boolean checkCrcs; + private final BufferSupplier decompressionBufferSupplier; + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> valueDeserializer; + private final IsolationLevel isolationLevel; + private final Iterator<? extends RecordBatch> batches; + private final Set<Long> abortedProducerIds; + private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions; + private final FetchMetricsAggregator metricAggregator; + + private int recordsRead; + private int bytesRead; + private RecordBatch currentBatch; + private Record lastRecord; + private CloseableIterator<Record> records; + private Exception cachedRecordException = null; + private boolean corruptLastRecord = false; + + CompletedFetch(LogContext logContext, + SubscriptionState subscriptions, + boolean checkCrcs, + BufferSupplier decompressionBufferSupplier, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer, + IsolationLevel isolationLevel, + TopicPartition partition, + FetchResponseData.PartitionData partitionData, + FetchMetricsAggregator metricAggregator, + Long fetchOffset, + short requestVersion) { + this.log = logContext.logger(CompletedFetch.class); + this.subscriptions = subscriptions; + this.checkCrcs = checkCrcs; + this.decompressionBufferSupplier = decompressionBufferSupplier; + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + this.isolationLevel = isolationLevel; + this.partition = partition; + this.partitionData = partitionData; + this.metricAggregator = metricAggregator; + this.batches = FetchResponse.recordsOrFail(partitionData).batches().iterator(); + this.nextFetchOffset = fetchOffset; + this.requestVersion = requestVersion; + this.lastEpoch = Optional.empty(); + this.abortedProducerIds = new HashSet<>(); + this.abortedTransactions = abortedTransactions(partitionData); + } + + /** + * After each partition is parsed, we update the current metric totals with the total bytes + * and number of records parsed. After all partitions have reported, we write the metric. + */ + void recordAggregatedMetrics(int bytes, int records) { + metricAggregator.record(partition, bytes, records); + } + + /** + * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources + * are closed. This is somewhat analogous to {@link Closeable#close() closing}, though no error will result if a + * caller invokes {@link #fetchRecords(int)}; an empty {@link List list} will be returned instead. + */ + void drain() { + if (!isConsumed) { + maybeCloseRecordStream(); + cachedRecordException = null; + this.isConsumed = true; + recordAggregatedMetrics(bytesRead, recordsRead); + + // we move the partition to the end if we received some bytes. This way, it's more likely that partitions + // for the same topic can remain together (allowing for more efficient serialization). + if (bytesRead > 0) + subscriptions.movePartitionToEnd(partition); + } + } + + private void maybeEnsureValid(RecordBatch batch) { + if (checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { + try { + batch.ensureValid(); + } catch (CorruptRecordException e) { + throw new KafkaException("Record batch for partition " + partition + " at offset " + + batch.baseOffset() + " is invalid, cause: " + e.getMessage()); + } + } + } + + private void maybeEnsureValid(Record record) { + if (checkCrcs) { + try { + record.ensureValid(); + } catch (CorruptRecordException e) { + throw new KafkaException("Record for partition " + partition + " at offset " + record.offset() + + " is invalid, cause: " + e.getMessage()); + } + } + } + + private void maybeCloseRecordStream() { + if (records != null) { + records.close(); + records = null; + } + } + + private Record nextFetchedRecord() { + while (true) { + if (records == null || !records.hasNext()) { + maybeCloseRecordStream(); + + if (!batches.hasNext()) { + // Message format v2 preserves the last offset in a batch even if the last record is removed + // through compaction. By using the next offset computed from the last offset in the batch, + // we ensure that the offset of the next fetch will point to the next batch, which avoids + // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck + // fetching the same batch repeatedly). + if (currentBatch != null) + nextFetchOffset = currentBatch.nextOffset(); + drain(); + return null; + } + + currentBatch = batches.next(); + lastEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch()); + maybeEnsureValid(currentBatch); + + if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { + // remove from the aborted transaction queue all aborted transactions which have begun + // before the current batch's last offset and add the associated producerIds to the + // aborted producer set + consumeAbortedTransactionsUpTo(currentBatch.lastOffset()); + + long producerId = currentBatch.producerId(); + if (containsAbortMarker(currentBatch)) { + abortedProducerIds.remove(producerId); + } else if (isBatchAborted(currentBatch)) { + log.debug("Skipping aborted record batch from partition {} with producerId {} and " + + "offsets {} to {}", + partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset()); + nextFetchOffset = currentBatch.nextOffset(); + continue; + } + } + + records = currentBatch.streamingIterator(decompressionBufferSupplier); + } else { + Record record = records.next(); + // skip any records out of range + if (record.offset() >= nextFetchOffset) { + // we only do validation when the message should not be skipped. + maybeEnsureValid(record); + + // control records are not returned to the user + if (!currentBatch.isControlBatch()) { + return record; + } else { + // Increment the next fetch offset when we skip a control batch. + nextFetchOffset = record.offset() + 1; + } + } + } + } + } + + /** + * The {@link RecordBatch batch} of {@link Record records} is converted to a {@link List list} of + * {@link ConsumerRecord consumer records} and returned. {@link BufferSupplier Decompression} and + * {@link Deserializer deserialization} of the {@link Record record's} key and value are performed in + * this step. + * + * @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords} + * @return {@link ConsumerRecord Consumer records} + */ + List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { + // Error when fetching the next record before deserialization. + if (corruptLastRecord) + throw new KafkaException("Received exception when fetching the next record from " + partition + + ". If needed, please seek past the record to " + + "continue consumption.", cachedRecordException); + + if (isConsumed) + return Collections.emptyList(); + + List<ConsumerRecord<K, V>> records = new ArrayList<>(); + + try { + for (int i = 0; i < maxRecords; i++) { + // Only move to next record if there was no exception in the last fetch. Otherwise, we should + // use the last record to do deserialization again. + if (cachedRecordException == null) { + corruptLastRecord = true; + lastRecord = nextFetchedRecord(); + corruptLastRecord = false; + } + + if (lastRecord == null) + break; + + Optional<Integer> leaderEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch()); + TimestampType timestampType = currentBatch.timestampType(); + ConsumerRecord<K, V> record = parseRecord(partition, leaderEpoch, timestampType, lastRecord); + records.add(record); + recordsRead++; + bytesRead += lastRecord.sizeInBytes(); + nextFetchOffset = lastRecord.offset() + 1; + // In some cases, the deserialization may have thrown an exception and the retry may succeed, + // we allow user to move forward in this case. + cachedRecordException = null; + } + } catch (SerializationException se) { + cachedRecordException = se; + if (records.isEmpty()) + throw se; + } catch (KafkaException e) { + cachedRecordException = e; + if (records.isEmpty()) + throw new KafkaException("Received exception when fetching the next record from " + partition + + ". If needed, please seek past the record to " + + "continue consumption.", e); + } + return records; + } + + /** + * Parse the record entry, deserializing the key / value fields if necessary + */ + ConsumerRecord<K, V> parseRecord(TopicPartition partition, + Optional<Integer> leaderEpoch, + TimestampType timestampType, + Record record) { + try { + long offset = record.offset(); + long timestamp = record.timestamp(); + Headers headers = new RecordHeaders(record.headers()); + ByteBuffer keyBytes = record.key(); + byte[] keyByteArray = keyBytes == null ? null : org.apache.kafka.common.utils.Utils.toArray(keyBytes); + K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); + ByteBuffer valueBytes = record.value(); + byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); + V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); + return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, + timestamp, timestampType, + keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, + valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, + key, value, headers, leaderEpoch); + } catch (RuntimeException e) { + throw new RecordDeserializationException(partition, record.offset(), + "Error deserializing key/value for partition " + partition + + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); + } + } + + private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) { + return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); + } + + private void consumeAbortedTransactionsUpTo(long offset) { + if (abortedTransactions == null) + return; + + while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset() <= offset) { + FetchResponseData.AbortedTransaction abortedTransaction = abortedTransactions.poll(); + abortedProducerIds.add(abortedTransaction.producerId()); + } + } + + private boolean isBatchAborted(RecordBatch batch) { + return batch.isTransactional() && abortedProducerIds.contains(batch.producerId()); + } + + private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partition) { + if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty()) + return null; + + PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions = new PriorityQueue<>( + partition.abortedTransactions().size(), Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset) + ); + abortedTransactions.addAll(partition.abortedTransactions()); + return abortedTransactions; + } + + private boolean containsAbortMarker(RecordBatch batch) { + if (!batch.isControlBatch()) + return false; + + Iterator<Record> batchIterator = batch.iterator(); + if (!batchIterator.hasNext()) + return false; + + Record firstRecord = batchIterator.next(); + return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key()); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java index e58db82ee3c..e119153af01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java @@ -26,10 +26,10 @@ import org.apache.kafka.common.metrics.Metrics; public class ConsumerMetrics { - public FetcherMetricsRegistry fetcherMetrics; + public FetchMetricsRegistry fetcherMetrics; public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) { - this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags, metricGrpPrefix); + this.fetcherMetrics = new FetchMetricsRegistry(metricsTags, metricGrpPrefix); } public ConsumerMetrics(String metricGroupPrefix) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java new file mode 100644 index 00000000000..4fe8199734d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Since we parse the message data for each partition from each fetch response lazily, fetch-level + * metrics need to be aggregated as the messages from each partition are parsed. This class is used + * to facilitate this incremental aggregation. + */ +class FetchMetricsAggregator { + + private final FetchMetricsManager metricsManager; + private final Set<TopicPartition> unrecordedPartitions; + private final FetchMetrics fetchFetchMetrics = new FetchMetrics(); + private final Map<String, FetchMetrics> perTopicFetchMetrics = new HashMap<>(); + + FetchMetricsAggregator(FetchMetricsManager metricsManager, Set<TopicPartition> partitions) { + this.metricsManager = metricsManager; + this.unrecordedPartitions = new HashSet<>(partitions); + } + + /** + * After each partition is parsed, we update the current metric totals with the total bytes + * and number of records parsed. After all partitions have reported, we write the metric. + */ + void record(TopicPartition partition, int bytes, int records) { + // Aggregate the metrics at the fetch level + fetchFetchMetrics.increment(bytes, records); + + // Also aggregate the metrics on a per-topic basis. + perTopicFetchMetrics.computeIfAbsent(partition.topic(), t -> new FetchMetrics()) + .increment(bytes, records); + + maybeRecordMetrics(partition); + } + + /** + * Once we've detected that all of the {@link TopicPartition partitions} for the fetch have been handled, we + * can then record the aggregated metrics values. This is done at the fetch level and on a per-topic basis. + * + * @param partition {@link TopicPartition} + */ + private void maybeRecordMetrics(TopicPartition partition) { + unrecordedPartitions.remove(partition); + + if (!unrecordedPartitions.isEmpty()) + return; + + // Record the metrics aggregated at the fetch level. + metricsManager.recordBytesFetched(fetchFetchMetrics.bytes); + metricsManager.recordRecordsFetched(fetchFetchMetrics.records); + + // Also record the metrics aggregated on a per-topic basis. + for (Map.Entry<String, FetchMetrics> entry: perTopicFetchMetrics.entrySet()) { + String topic = entry.getKey(); + FetchMetrics fetchMetrics = entry.getValue(); + metricsManager.recordBytesFetched(topic, fetchMetrics.bytes); + metricsManager.recordRecordsFetched(topic, fetchMetrics.records); + } + } + + private static class FetchMetrics { + + private int bytes; + private int records; + + private void increment(int bytes, int records) { + this.bytes += bytes; + this.records += records; + } + + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java new file mode 100644 index 00000000000..63bd7650701 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * The {@link FetchMetricsManager} class provides wrapper methods to record lag, lead, latency, and fetch metrics. + * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it + * records matches up with the topic-partitions in use. + */ +class FetchMetricsManager { + + private final Metrics metrics; + private final FetchMetricsRegistry metricsRegistry; + private final Sensor bytesFetched; + private final Sensor recordsFetched; + private final Sensor fetchLatency; + private final Sensor recordsLag; + private final Sensor recordsLead; + + private int assignmentId = 0; + private Set<TopicPartition> assignedPartitions = Collections.emptySet(); + + FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) { + this.metrics = metrics; + this.metricsRegistry = metricsRegistry; + + this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") + .withAvg(metricsRegistry.fetchSizeAvg) + .withMax(metricsRegistry.fetchSizeMax) + .withMeter(metricsRegistry.bytesConsumedRate, metricsRegistry.bytesConsumedTotal) + .build(); + this.recordsFetched = new SensorBuilder(metrics, "records-fetched") + .withAvg(metricsRegistry.recordsPerRequestAvg) + .withMeter(metricsRegistry.recordsConsumedRate, metricsRegistry.recordsConsumedTotal) + .build(); + this.fetchLatency = new SensorBuilder(metrics, "fetch-latency") + .withAvg(metricsRegistry.fetchLatencyAvg) + .withMax(metricsRegistry.fetchLatencyMax) + .withMeter(new WindowedCount(), metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal) + .build(); + this.recordsLag = new SensorBuilder(metrics, "records-lag") + .withMax(metricsRegistry.recordsLagMax) + .build(); + this.recordsLead = new SensorBuilder(metrics, "records-lead") + .withMin(metricsRegistry.recordsLeadMin) + .build(); + } + + void recordLatency(long requestLatencyMs) { + fetchLatency.record(requestLatencyMs); + } + + void recordBytesFetched(int bytes) { + bytesFetched.record(bytes); + } + + void recordRecordsFetched(int records) { + recordsFetched.record(records); + } + + void recordBytesFetched(String topic, int bytes) { + String name = topicBytesFetchedMetricName(topic); + Sensor bytesFetched = new SensorBuilder(metrics, name, () -> topicTags(topic)) + .withAvg(metricsRegistry.topicFetchSizeAvg) + .withMax(metricsRegistry.topicFetchSizeMax) + .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) + .build(); + bytesFetched.record(bytes); + } + + void recordRecordsFetched(String topic, int records) { + String name = topicRecordsFetchedMetricName(topic); + Sensor recordsFetched = new SensorBuilder(metrics, name, () -> topicTags(topic)) + .withAvg(metricsRegistry.topicRecordsPerRequestAvg) + .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) + .build(); + recordsFetched.record(records); + } + + void recordPartitionLag(TopicPartition tp, long lag) { + this.recordsLag.record(lag); + + String name = partitionRecordsLagMetricName(tp); + Sensor recordsLag = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp)) + .withValue(metricsRegistry.partitionRecordsLag) + .withMax(metricsRegistry.partitionRecordsLagMax) + .withAvg(metricsRegistry.partitionRecordsLagAvg) + .build(); + + recordsLag.record(lag); + } + + void recordPartitionLead(TopicPartition tp, long lead) { + this.recordsLead.record(lead); + + String name = partitionRecordsLeadMetricName(tp); + Sensor recordsLead = new SensorBuilder(metrics, name, () -> topicPartitionTags(tp)) + .withValue(metricsRegistry.partitionRecordsLead) + .withMin(metricsRegistry.partitionRecordsLeadMin) + .withAvg(metricsRegistry.partitionRecordsLeadAvg) + .build(); + + recordsLead.record(lead); + } + + /** + * This method is called by the {@link Fetch fetch} logic before it requests fetches in order to update the + * internal set of metrics that are tracked. + * + * @param subscription {@link SubscriptionState} that contains the set of assigned partitions + * @see SubscriptionState#assignmentId() + */ + void maybeUpdateAssignment(SubscriptionState subscription) { + int newAssignmentId = subscription.assignmentId(); + + if (this.assignmentId != newAssignmentId) { + Set<TopicPartition> newAssignedPartitions = subscription.assignedPartitions(); + + for (TopicPartition tp : this.assignedPartitions) { + if (!newAssignedPartitions.contains(tp)) { + metrics.removeSensor(partitionRecordsLagMetricName(tp)); + metrics.removeSensor(partitionRecordsLeadMetricName(tp)); + metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); + } + } + + for (TopicPartition tp : newAssignedPartitions) { + if (!this.assignedPartitions.contains(tp)) { + MetricName metricName = partitionPreferredReadReplicaMetricName(tp); + metrics.addMetricIfAbsent( + metricName, + null, + (Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) + ); + } + } + + this.assignedPartitions = newAssignedPartitions; + this.assignmentId = newAssignmentId; + } + } + + static String topicBytesFetchedMetricName(String topic) { + return "topic." + topic + ".bytes-fetched"; + } + + private static String topicRecordsFetchedMetricName(String topic) { + return "topic." + topic + ".records-fetched"; + } + + private static String partitionRecordsLeadMetricName(TopicPartition tp) { + return tp + ".records-lead"; + } + + private static String partitionRecordsLagMetricName(TopicPartition tp) { + return tp + ".records-lag"; + } + + private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) { + Map<String, String> metricTags = topicPartitionTags(tp); + return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); + } + + static Map<String, String> topicTags(String topic) { + Map<String, String> metricTags = new HashMap<>(1); + metricTags.put("topic", topic.replace('.', '_')); + return metricTags; + } + + static Map<String, String> topicPartitionTags(TopicPartition tp) { + Map<String, String> metricTags = new HashMap<>(2); + metricTags.put("topic", tp.topic().replace('.', '_')); + metricTags.put("partition", String.valueOf(tp.partition())); + return metricTags; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java similarity index 97% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java index f76a92462d5..fc4ac1d665e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.kafka.common.MetricNameTemplate; -public class FetcherMetricsRegistry { +public class FetchMetricsRegistry { public MetricNameTemplate fetchSizeAvg; public MetricNameTemplate fetchSizeMax; @@ -56,15 +56,15 @@ public class FetcherMetricsRegistry { public MetricNameTemplate partitionRecordsLeadAvg; public MetricNameTemplate partitionPreferredReadReplica; - public FetcherMetricsRegistry() { + public FetchMetricsRegistry() { this(new HashSet<String>(), ""); } - public FetcherMetricsRegistry(String metricGrpPrefix) { + public FetchMetricsRegistry(String metricGrpPrefix) { this(new HashSet<String>(), metricGrpPrefix); } - public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) { + public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) { /***** Client level *****/ String groupName = metricGrpPrefix + "-fetch-manager-metrics"; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index ae7973aa808..29cd7972cc6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -25,38 +25,23 @@ import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPositi import org.apache.kafka.common.Cluster; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.CorruptRecordException; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.message.FetchResponseData; -import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Meter; -import org.apache.kafka.common.metrics.stats.Min; -import org.apache.kafka.common.metrics.stats.Value; -import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.common.record.ControlRecordType; -import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -65,12 +50,10 @@ import org.slf4j.Logger; import org.slf4j.helpers.MessageFormatter; import java.io.Closeable; -import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -78,7 +61,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -116,9 +98,9 @@ public class Fetcher<K, V> implements Closeable { private final boolean checkCrcs; private final String clientRackId; private final ConsumerMetadata metadata; - private final FetchManagerMetrics sensors; + private final FetchMetricsManager metricsManager; private final SubscriptionState subscriptions; - private final ConcurrentLinkedQueue<CompletedFetch> completedFetches; + private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches; private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create(); private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; @@ -126,7 +108,7 @@ public class Fetcher<K, V> implements Closeable { private final Map<Integer, FetchSessionHandler> sessionHandlers; private final Set<Integer> nodesWithPendingFetchRequests; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private CompletedFetch nextInLineFetch = null; + private CompletedFetch<K, V> nextInLineFetch = null; public Fetcher(LogContext logContext, ConsumerNetworkClient client, @@ -142,7 +124,7 @@ public class Fetcher<K, V> implements Closeable { ConsumerMetadata metadata, SubscriptionState subscriptions, Metrics metrics, - FetcherMetricsRegistry metricsRegistry, + FetchMetricsRegistry metricsRegistry, Time time, IsolationLevel isolationLevel) { this.log = logContext.logger(Fetcher.class); @@ -161,7 +143,7 @@ public class Fetcher<K, V> implements Closeable { this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; this.completedFetches = new ConcurrentLinkedQueue<>(); - this.sensors = new FetchManagerMetrics(metrics, metricsRegistry); + this.metricsManager = new FetchMetricsManager(metrics, metricsRegistry); this.isolationLevel = isolationLevel; this.sessionHandlers = new HashMap<>(); this.nodesWithPendingFetchRequests = new HashSet<>(); @@ -191,7 +173,7 @@ public class Fetcher<K, V> implements Closeable { */ public synchronized int sendFetches() { // Update metrics in case there was an assignment change - sensors.maybeUpdateAssignment(subscriptions); + metricsManager.maybeUpdateAssignment(subscriptions); Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { @@ -224,7 +206,7 @@ public class Fetcher<K, V> implements Closeable { Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), resp.requestHeader().apiVersion()); Set<TopicPartition> partitions = new HashSet<>(responseData.keySet()); - FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); + FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions); for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) { TopicPartition partition = entry.getKey(); @@ -245,20 +227,29 @@ public class Fetcher<K, V> implements Closeable { throw new IllegalStateException(message); } else { long fetchOffset = requestData.fetchOffset; + short requestVersion = resp.requestHeader().apiVersion(); FetchResponseData.PartitionData partitionData = entry.getValue(); log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", isolationLevel, fetchOffset, partition, partitionData); - Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partitionData).batches().iterator(); - short responseVersion = resp.requestHeader().apiVersion(); - - completedFetches.add(new CompletedFetch(partition, partitionData, - metricAggregator, batches, fetchOffset, responseVersion)); + CompletedFetch<K, V> completedFetch = new CompletedFetch<>(logContext, + subscriptions, + checkCrcs, + decompressionBufferSupplier, + keyDeserializer, + valueDeserializer, + isolationLevel, + partition, + partitionData, + metricAggregator, + fetchOffset, + requestVersion); + completedFetches.add(completedFetch); } } - sensors.fetchLatency.record(resp.requestLatencyMs()); + metricsManager.recordLatency(resp.requestLatencyMs()); } finally { nodesWithPendingFetchRequests.remove(fetchTarget.id()); } @@ -328,16 +319,16 @@ public class Fetcher<K, V> implements Closeable { */ public Fetch<K, V> collectFetch() { Fetch<K, V> fetch = Fetch.empty(); - Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>(); + Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>(); int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0) { if (nextInLineFetch == null || nextInLineFetch.isConsumed) { - CompletedFetch records = completedFetches.peek(); + CompletedFetch<K, V> records = completedFetches.peek(); if (records == null) break; - if (records.notInitialized()) { + if (!records.initialized) { try { nextInLineFetch = initializeCompletedFetch(records); } catch (Exception e) { @@ -380,7 +371,7 @@ public class Fetcher<K, V> implements Closeable { return fetch; } - private Fetch<K, V> fetchRecords(CompletedFetch completedFetch, int maxRecords) { + private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int maxRecords) { if (!subscriptions.isAssigned(completedFetch.partition)) { // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for partition {} since it is no longer assigned", @@ -417,11 +408,11 @@ public class Fetcher<K, V> implements Closeable { Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel); if (partitionLag != null) - this.sensors.recordPartitionLag(completedFetch.partition, partitionLag); + this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag); Long lead = subscriptions.partitionLead(completedFetch.partition); if (lead != null) { - this.sensors.recordPartitionLead(completedFetch.partition, lead); + this.metricsManager.recordPartitionLead(completedFetch.partition, lead); } return Fetch.forPartition(completedFetch.partition, partRecords, positionAdvanced); @@ -444,7 +435,7 @@ public class Fetcher<K, V> implements Closeable { if (nextInLineFetch != null && !nextInLineFetch.isConsumed) { exclude.add(nextInLineFetch.partition); } - for (CompletedFetch completedFetch : completedFetches) { + for (CompletedFetch<K, V> completedFetch : completedFetches) { exclude.add(completedFetch.partition); } return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); @@ -537,11 +528,11 @@ public class Fetcher<K, V> implements Closeable { /** * Initialize a CompletedFetch object. */ - private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetch) { + private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> nextCompletedFetch) { TopicPartition tp = nextCompletedFetch.partition; FetchResponseData.PartitionData partition = nextCompletedFetch.partitionData; long fetchOffset = nextCompletedFetch.nextFetchOffset; - CompletedFetch completedFetch = null; + CompletedFetch<K, V> completedFetch = null; Errors error = Errors.forCode(partition.errorCode()); try { @@ -564,7 +555,7 @@ public class Fetcher<K, V> implements Closeable { completedFetch = nextCompletedFetch; if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) { - if (completedFetch.responseVersion < 3) { + if (completedFetch.requestVersion < 3) { // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + @@ -660,7 +651,7 @@ public class Fetcher<K, V> implements Closeable { } } finally { if (completedFetch == null) - nextCompletedFetch.metricAggregator.record(tp, 0, 0); + nextCompletedFetch.recordAggregatedMetrics(0, 0); if (error != Errors.NONE) // we move the partition to the end if there was an error. This way, it's more likely that partitions for @@ -683,49 +674,15 @@ public class Fetcher<K, V> implements Closeable { } } - /** - * Parse the record entry, deserializing the key / value fields if necessary - */ - private ConsumerRecord<K, V> parseRecord(TopicPartition partition, - RecordBatch batch, - Record record) { - try { - long offset = record.offset(); - long timestamp = record.timestamp(); - Optional<Integer> leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch()); - TimestampType timestampType = batch.timestampType(); - Headers headers = new RecordHeaders(record.headers()); - ByteBuffer keyBytes = record.key(); - byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); - K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); - ByteBuffer valueBytes = record.value(); - byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); - V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); - return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, - keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, - valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, - key, value, headers, leaderEpoch); - } catch (RuntimeException e) { - throw new RecordDeserializationException(partition, record.offset(), - "Error deserializing key/value for partition " + partition + - " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); - } - } - - private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) { - return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Optional.empty() : Optional.of(leaderEpoch); - } - /** * Clear the buffered data which are not a part of newly assigned partitions * * @param assignedPartitions newly assigned {@link TopicPartition} */ public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) { - Iterator<CompletedFetch> completedFetchesItr = completedFetches.iterator(); + Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator(); while (completedFetchesItr.hasNext()) { - CompletedFetch records = completedFetchesItr.next(); + CompletedFetch<K, V> records = completedFetchesItr.next(); TopicPartition tp = records.partition; if (!assignedPartitions.contains(tp)) { records.drain(); @@ -759,7 +716,7 @@ public class Fetcher<K, V> implements Closeable { return sessionHandlers.get(node); } - public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) { + public static Sensor throttleTimeSensor(Metrics metrics, FetchMetricsRegistry metricsRegistry) { Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg()); @@ -768,448 +725,6 @@ public class Fetcher<K, V> implements Closeable { return fetchThrottleTimeSensor; } - private class CompletedFetch { - private final TopicPartition partition; - private final Iterator<? extends RecordBatch> batches; - private final Set<Long> abortedProducerIds; - private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions; - private final FetchResponseData.PartitionData partitionData; - private final FetchResponseMetricAggregator metricAggregator; - private final short responseVersion; - - private int recordsRead; - private int bytesRead; - private RecordBatch currentBatch; - private Record lastRecord; - private CloseableIterator<Record> records; - private long nextFetchOffset; - private Optional<Integer> lastEpoch; - private boolean isConsumed = false; - private Exception cachedRecordException = null; - private boolean corruptLastRecord = false; - private boolean initialized = false; - - private CompletedFetch(TopicPartition partition, - FetchResponseData.PartitionData partitionData, - FetchResponseMetricAggregator metricAggregator, - Iterator<? extends RecordBatch> batches, - Long fetchOffset, - short responseVersion) { - this.partition = partition; - this.partitionData = partitionData; - this.metricAggregator = metricAggregator; - this.batches = batches; - this.nextFetchOffset = fetchOffset; - this.responseVersion = responseVersion; - this.lastEpoch = Optional.empty(); - this.abortedProducerIds = new HashSet<>(); - this.abortedTransactions = abortedTransactions(partitionData); - } - - private void drain() { - if (!isConsumed) { - maybeCloseRecordStream(); - cachedRecordException = null; - this.isConsumed = true; - this.metricAggregator.record(partition, bytesRead, recordsRead); - - // we move the partition to the end if we received some bytes. This way, it's more likely that partitions - // for the same topic can remain together (allowing for more efficient serialization). - if (bytesRead > 0) - subscriptions.movePartitionToEnd(partition); - } - } - - private void maybeEnsureValid(RecordBatch batch) { - if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) { - try { - batch.ensureValid(); - } catch (CorruptRecordException e) { - throw new KafkaException("Record batch for partition " + partition + " at offset " + - batch.baseOffset() + " is invalid, cause: " + e.getMessage()); - } - } - } - - private void maybeEnsureValid(Record record) { - if (checkCrcs) { - try { - record.ensureValid(); - } catch (CorruptRecordException e) { - throw new KafkaException("Record for partition " + partition + " at offset " + record.offset() - + " is invalid, cause: " + e.getMessage()); - } - } - } - - private void maybeCloseRecordStream() { - if (records != null) { - records.close(); - records = null; - } - } - - private Record nextFetchedRecord() { - while (true) { - if (records == null || !records.hasNext()) { - maybeCloseRecordStream(); - - if (!batches.hasNext()) { - // Message format v2 preserves the last offset in a batch even if the last record is removed - // through compaction. By using the next offset computed from the last offset in the batch, - // we ensure that the offset of the next fetch will point to the next batch, which avoids - // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck - // fetching the same batch repeatedly). - if (currentBatch != null) - nextFetchOffset = currentBatch.nextOffset(); - drain(); - return null; - } - - currentBatch = batches.next(); - lastEpoch = currentBatch.partitionLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? - Optional.empty() : Optional.of(currentBatch.partitionLeaderEpoch()); - - maybeEnsureValid(currentBatch); - - if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { - // remove from the aborted transaction queue all aborted transactions which have begun - // before the current batch's last offset and add the associated producerIds to the - // aborted producer set - consumeAbortedTransactionsUpTo(currentBatch.lastOffset()); - - long producerId = currentBatch.producerId(); - if (containsAbortMarker(currentBatch)) { - abortedProducerIds.remove(producerId); - } else if (isBatchAborted(currentBatch)) { - log.debug("Skipping aborted record batch from partition {} with producerId {} and " + - "offsets {} to {}", - partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset()); - nextFetchOffset = currentBatch.nextOffset(); - continue; - } - } - - records = currentBatch.streamingIterator(decompressionBufferSupplier); - } else { - Record record = records.next(); - // skip any records out of range - if (record.offset() >= nextFetchOffset) { - // we only do validation when the message should not be skipped. - maybeEnsureValid(record); - - // control records are not returned to the user - if (!currentBatch.isControlBatch()) { - return record; - } else { - // Increment the next fetch offset when we skip a control batch. - nextFetchOffset = record.offset() + 1; - } - } - } - } - } - - private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) { - // Error when fetching the next record before deserialization. - if (corruptLastRecord) - throw new KafkaException("Received exception when fetching the next record from " + partition - + ". If needed, please seek past the record to " - + "continue consumption.", cachedRecordException); - - if (isConsumed) - return Collections.emptyList(); - - List<ConsumerRecord<K, V>> records = new ArrayList<>(); - try { - for (int i = 0; i < maxRecords; i++) { - // Only move to next record if there was no exception in the last fetch. Otherwise we should - // use the last record to do deserialization again. - if (cachedRecordException == null) { - corruptLastRecord = true; - lastRecord = nextFetchedRecord(); - corruptLastRecord = false; - } - if (lastRecord == null) - break; - records.add(parseRecord(partition, currentBatch, lastRecord)); - recordsRead++; - bytesRead += lastRecord.sizeInBytes(); - nextFetchOffset = lastRecord.offset() + 1; - // In some cases, the deserialization may have thrown an exception and the retry may succeed, - // we allow user to move forward in this case. - cachedRecordException = null; - } - } catch (SerializationException se) { - cachedRecordException = se; - if (records.isEmpty()) - throw se; - } catch (KafkaException e) { - cachedRecordException = e; - if (records.isEmpty()) - throw new KafkaException("Received exception when fetching the next record from " + partition - + ". If needed, please seek past the record to " - + "continue consumption.", e); - } - return records; - } - - private void consumeAbortedTransactionsUpTo(long offset) { - if (abortedTransactions == null) - return; - - while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset() <= offset) { - FetchResponseData.AbortedTransaction abortedTransaction = abortedTransactions.poll(); - abortedProducerIds.add(abortedTransaction.producerId()); - } - } - - private boolean isBatchAborted(RecordBatch batch) { - return batch.isTransactional() && abortedProducerIds.contains(batch.producerId()); - } - - private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partition) { - if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty()) - return null; - - PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions = new PriorityQueue<>( - partition.abortedTransactions().size(), Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset) - ); - abortedTransactions.addAll(partition.abortedTransactions()); - return abortedTransactions; - } - - private boolean containsAbortMarker(RecordBatch batch) { - if (!batch.isControlBatch()) - return false; - - Iterator<Record> batchIterator = batch.iterator(); - if (!batchIterator.hasNext()) - return false; - - Record firstRecord = batchIterator.next(); - return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key()); - } - - private boolean notInitialized() { - return !this.initialized; - } - } - - /** - * Since we parse the message data for each partition from each fetch response lazily, fetch-level - * metrics need to be aggregated as the messages from each partition are parsed. This class is used - * to facilitate this incremental aggregation. - */ - private static class FetchResponseMetricAggregator { - private final FetchManagerMetrics sensors; - private final Set<TopicPartition> unrecordedPartitions; - - private final FetchMetrics fetchMetrics = new FetchMetrics(); - private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>(); - - private FetchResponseMetricAggregator(FetchManagerMetrics sensors, - Set<TopicPartition> partitions) { - this.sensors = sensors; - this.unrecordedPartitions = partitions; - } - - /** - * After each partition is parsed, we update the current metric totals with the total bytes - * and number of records parsed. After all partitions have reported, we write the metric. - */ - public void record(TopicPartition partition, int bytes, int records) { - this.unrecordedPartitions.remove(partition); - this.fetchMetrics.increment(bytes, records); - - // collect and aggregate per-topic metrics - String topic = partition.topic(); - FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic); - if (topicFetchMetric == null) { - topicFetchMetric = new FetchMetrics(); - this.topicFetchMetrics.put(topic, topicFetchMetric); - } - topicFetchMetric.increment(bytes, records); - - if (this.unrecordedPartitions.isEmpty()) { - // once all expected partitions from the fetch have reported in, record the metrics - this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes); - this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords); - - // also record per-topic metrics - for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) { - FetchMetrics metric = entry.getValue(); - this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords); - } - } - } - - private static class FetchMetrics { - private int fetchBytes; - private int fetchRecords; - - protected void increment(int bytes, int records) { - this.fetchBytes += bytes; - this.fetchRecords += records; - } - } - } - - private static class FetchManagerMetrics { - private final Metrics metrics; - private final FetcherMetricsRegistry metricsRegistry; - private final Sensor bytesFetched; - private final Sensor recordsFetched; - private final Sensor fetchLatency; - private final Sensor recordsFetchLag; - private final Sensor recordsFetchLead; - - private int assignmentId = 0; - private Set<TopicPartition> assignedPartitions = Collections.emptySet(); - - private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry metricsRegistry) { - this.metrics = metrics; - this.metricsRegistry = metricsRegistry; - - this.bytesFetched = metrics.sensor("bytes-fetched"); - this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new Avg()); - this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new Max()); - this.bytesFetched.add(new Meter(metrics.metricInstance(metricsRegistry.bytesConsumedRate), - metrics.metricInstance(metricsRegistry.bytesConsumedTotal))); - - this.recordsFetched = metrics.sensor("records-fetched"); - this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg), new Avg()); - this.recordsFetched.add(new Meter(metrics.metricInstance(metricsRegistry.recordsConsumedRate), - metrics.metricInstance(metricsRegistry.recordsConsumedTotal))); - - this.fetchLatency = metrics.sensor("fetch-latency"); - this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), new Avg()); - this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), new Max()); - this.fetchLatency.add(new Meter(new WindowedCount(), metrics.metricInstance(metricsRegistry.fetchRequestRate), - metrics.metricInstance(metricsRegistry.fetchRequestTotal))); - - this.recordsFetchLag = metrics.sensor("records-lag"); - this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), new Max()); - - this.recordsFetchLead = metrics.sensor("records-lead"); - this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin), new Min()); - } - - private void recordTopicFetchMetrics(String topic, int bytes, int records) { - // record bytes fetched - String name = "topic." + topic + ".bytes-fetched"; - Sensor bytesFetched = this.metrics.getSensor(name); - if (bytesFetched == null) { - Map<String, String> metricTags = Collections.singletonMap("topic", topic.replace('.', '_')); - - bytesFetched = this.metrics.sensor(name); - bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg, - metricTags), new Avg()); - bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax, - metricTags), new Max()); - bytesFetched.add(new Meter(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate, metricTags), - this.metrics.metricInstance(metricsRegistry.topicBytesConsumedTotal, metricTags))); - } - bytesFetched.record(bytes); - - // record records fetched - name = "topic." + topic + ".records-fetched"; - Sensor recordsFetched = this.metrics.getSensor(name); - if (recordsFetched == null) { - Map<String, String> metricTags = new HashMap<>(1); - metricTags.put("topic", topic.replace('.', '_')); - - recordsFetched = this.metrics.sensor(name); - recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg, - metricTags), new Avg()); - recordsFetched.add(new Meter(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate, metricTags), - this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedTotal, metricTags))); - } - recordsFetched.record(records); - } - - private void maybeUpdateAssignment(SubscriptionState subscription) { - int newAssignmentId = subscription.assignmentId(); - if (this.assignmentId != newAssignmentId) { - Set<TopicPartition> newAssignedPartitions = subscription.assignedPartitions(); - for (TopicPartition tp : this.assignedPartitions) { - if (!newAssignedPartitions.contains(tp)) { - metrics.removeSensor(partitionLagMetricName(tp)); - metrics.removeSensor(partitionLeadMetricName(tp)); - metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); - } - } - - for (TopicPartition tp : newAssignedPartitions) { - if (!this.assignedPartitions.contains(tp)) { - MetricName metricName = partitionPreferredReadReplicaMetricName(tp); - metrics.addMetricIfAbsent( - metricName, - null, - (Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1) - ); - } - } - - this.assignedPartitions = newAssignedPartitions; - this.assignmentId = newAssignmentId; - } - } - - private void recordPartitionLead(TopicPartition tp, long lead) { - this.recordsFetchLead.record(lead); - - String name = partitionLeadMetricName(tp); - Sensor recordsLead = this.metrics.getSensor(name); - if (recordsLead == null) { - Map<String, String> metricTags = topicPartitionTags(tp); - - recordsLead = this.metrics.sensor(name); - - recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead, metricTags), new Value()); - recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin, metricTags), new Min()); - recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg, metricTags), new Avg()); - } - recordsLead.record(lead); - } - - private void recordPartitionLag(TopicPartition tp, long lag) { - this.recordsFetchLag.record(lag); - - String name = partitionLagMetricName(tp); - Sensor recordsLag = this.metrics.getSensor(name); - if (recordsLag == null) { - Map<String, String> metricTags = topicPartitionTags(tp); - recordsLag = this.metrics.sensor(name); - - recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value()); - recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax, metricTags), new Max()); - recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg, metricTags), new Avg()); - } - recordsLag.record(lag); - } - - private static String partitionLagMetricName(TopicPartition tp) { - return tp + ".records-lag"; - } - - private static String partitionLeadMetricName(TopicPartition tp) { - return tp + ".records-lead"; - } - - private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) { - Map<String, String> metricTags = topicPartitionTags(tp); - return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags); - } - - private Map<String, String> topicPartitionTags(TopicPartition tp) { - Map<String, String> metricTags = new HashMap<>(2); - metricTags.put("topic", tp.topic().replace('.', '_')); - metricTags.put("partition", String.valueOf(tp.partition())); - return metricTags; - } - } - // Visible for testing void maybeCloseFetchSessions(final Timer timer) { final Cluster cluster = metadata.fetch(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java new file mode 100644 index 00000000000..2272ee5c0a3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.SampledStat; +import org.apache.kafka.common.metrics.stats.Value; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; + +/** + * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link Sensor sensors} for recording + * {@link Metric metrics}. + */ +public class SensorBuilder { + + private final Metrics metrics; + + private final Sensor sensor; + + private final boolean prexisting; + + private final Map<String, String> tags; + + public SensorBuilder(Metrics metrics, String name) { + this(metrics, name, Collections::emptyMap); + } + + public SensorBuilder(Metrics metrics, String name, Supplier<Map<String, String>> tagsSupplier) { + this.metrics = metrics; + Sensor s = metrics.getSensor(name); + + if (s != null) { + sensor = s; + tags = Collections.emptyMap(); + prexisting = true; + } else { + sensor = metrics.sensor(name); + tags = tagsSupplier.get(); + prexisting = false; + } + } + + SensorBuilder withAvg(MetricNameTemplate name) { + if (!prexisting) + sensor.add(metrics.metricInstance(name, tags), new Avg()); + + return this; + } + + SensorBuilder withMin(MetricNameTemplate name) { + if (!prexisting) + sensor.add(metrics.metricInstance(name, tags), new Min()); + + return this; + } + + SensorBuilder withMax(MetricNameTemplate name) { + if (!prexisting) + sensor.add(metrics.metricInstance(name, tags), new Max()); + + return this; + } + + SensorBuilder withValue(MetricNameTemplate name) { + if (!prexisting) + sensor.add(metrics.metricInstance(name, tags), new Value()); + + return this; + } + + SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!prexisting) { + sensor.add(new Meter(metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); + } + + return this; + } + + SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate rateName, MetricNameTemplate totalName) { + if (!prexisting) { + sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, tags), metrics.metricInstance(totalName, tags))); + } + + return this; + } + + Sensor build() { + return sensor; + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java new file mode 100644 index 00000000000..b420852852a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.UUIDDeserializer; +import org.apache.kafka.common.serialization.UUIDSerializer; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class CompletedFetchTest { + + private final static String TOPIC_NAME = "test"; + private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0); + private final static long PRODUCER_ID = 1000L; + private final static short PRODUCER_EPOCH = 0; + + private BufferSupplier bufferSupplier; + + @BeforeEach + public void setup() { + bufferSupplier = BufferSupplier.create(); + } + + @AfterEach + public void tearDown() { + if (bufferSupplier != null) + bufferSupplier.close(); + } + + @Test + public void testSimple() { + long fetchOffset = 5; + int startingOffset = 10; + int numRecords = 11; // Records for 10-20 + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setRecords(newRecords(startingOffset, numRecords, fetchOffset)); + + CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData); + + List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10); + assertEquals(10, records.size()); + ConsumerRecord<String, String> record = records.get(0); + assertEquals(10, record.offset()); + + records = completedFetch.fetchRecords(10); + assertEquals(1, records.size()); + record = records.get(0); + assertEquals(20, record.offset()); + + records = completedFetch.fetchRecords(10); + assertEquals(0, records.size()); + } + + @Test + public void testAbortedTransactionRecordsRemoved() { + int numRecords = 10; + Records rawRecords = newTranscactionalRecords(ControlRecordType.ABORT, numRecords); + + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setRecords(rawRecords) + .setAbortedTransactions(newAbortedTransactions()); + + CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED, + OffsetResetStrategy.NONE, + true, + 0, + partitionData); + List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10); + assertEquals(0, records.size()); + + completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED, + OffsetResetStrategy.NONE, + true, + 0, + partitionData); + records = completedFetch.fetchRecords(10); + assertEquals(numRecords, records.size()); + } + + @Test + public void testCommittedTransactionRecordsIncluded() { + int numRecords = 10; + Records rawRecords = newTranscactionalRecords(ControlRecordType.COMMIT, numRecords); + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setRecords(rawRecords); + CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_COMMITTED, + OffsetResetStrategy.NONE, + true, + 0, + partitionData); + List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10); + assertEquals(10, records.size()); + } + + @Test + public void testNegativeFetchCount() { + long fetchOffset = 0; + int startingOffset = 0; + int numRecords = 10; + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setRecords(newRecords(startingOffset, numRecords, fetchOffset)); + + CompletedFetch<String, String> completedFetch = newCompletedFetch(fetchOffset, partitionData); + + List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(-10); + assertEquals(0, records.size()); + } + + @Test + public void testNoRecordsInFetch() { + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setPartitionIndex(0) + .setHighWatermark(10) + .setLastStableOffset(20) + .setLogStartOffset(0); + + CompletedFetch<String, String> completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED, + OffsetResetStrategy.NONE, + false, + 1, + partitionData); + + List<ConsumerRecord<String, String>> records = completedFetch.fetchRecords(10); + assertEquals(0, records.size()); + } + + @Test + public void testCorruptedMessage() { + // Create one good record and then one "corrupted" record. + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0); + builder.append(new SimpleRecord(new UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID()))); + builder.append(0L, "key".getBytes(), "value".getBytes()); + Records records = builder.build(); + + FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData() + .setPartitionIndex(0) + .setHighWatermark(10) + .setLastStableOffset(20) + .setLogStartOffset(0) + .setRecords(records); + + CompletedFetch<UUID, UUID> completedFetch = newCompletedFetch(new UUIDDeserializer(), + new UUIDDeserializer(), + IsolationLevel.READ_COMMITTED, + OffsetResetStrategy.NONE, + false, + 0, + partitionData); + + completedFetch.fetchRecords(10); + + assertThrows(RecordDeserializationException.class, () -> completedFetch.fetchRecords(10)); + } + + private CompletedFetch<String, String> newCompletedFetch(long fetchOffset, + FetchResponseData.PartitionData partitionData) { + return newCompletedFetch( + IsolationLevel.READ_UNCOMMITTED, + OffsetResetStrategy.NONE, + true, + fetchOffset, + partitionData); + } + + private CompletedFetch<String, String> newCompletedFetch(IsolationLevel isolationLevel, + OffsetResetStrategy offsetResetStrategy, + boolean checkCrcs, + long fetchOffset, + FetchResponseData.PartitionData partitionData) { + return newCompletedFetch(new StringDeserializer(), + new StringDeserializer(), + isolationLevel, + offsetResetStrategy, + checkCrcs, + fetchOffset, + partitionData); + } + + private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer, + IsolationLevel isolationLevel, + OffsetResetStrategy offsetResetStrategy, + boolean checkCrcs, + long fetchOffset, + FetchResponseData.PartitionData partitionData) { + LogContext logContext = new LogContext(); + SubscriptionState subscriptions = new SubscriptionState(logContext, offsetResetStrategy); + FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(); + FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry); + FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP)); + + return new CompletedFetch<>(logContext, + subscriptions, + checkCrcs, + bufferSupplier, + keyDeserializer, + valueDeserializer, + isolationLevel, + TP, + partitionData, + metricAggregator, + fetchOffset, + ApiKeys.FETCH.latestVersion()); + } + + private Records newRecords(long baseOffset, int count, long firstMessageId) { + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, baseOffset); + for (int i = 0; i < count; i++) + builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + i)).getBytes()); + return builder.build(); + } + + private Records newTranscactionalRecords(ControlRecordType controlRecordType, int numRecords) { + Time time = new MockTime(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, + RecordBatch.CURRENT_MAGIC_VALUE, + CompressionType.NONE, + TimestampType.CREATE_TIME, + 0, + time.milliseconds(), + PRODUCER_ID, + PRODUCER_EPOCH, + 0, + true, + RecordBatch.NO_PARTITION_LEADER_EPOCH); + + for (int i = 0; i < numRecords; i++) + builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); + + builder.build(); + writeTransactionMarker(buffer, controlRecordType, numRecords, time); + buffer.flip(); + + return MemoryRecords.readableRecords(buffer); + } + + private void writeTransactionMarker(ByteBuffer buffer, + ControlRecordType controlRecordType, + int offset, + Time time) { + MemoryRecords.writeEndTransactionalMarker(buffer, + offset, + time.milliseconds(), + 0, + PRODUCER_ID, + PRODUCER_EPOCH, + new EndTransactionMarker(controlRecordType, 0)); + } + + private List<FetchResponseData.AbortedTransaction> newAbortedTransactions() { + FetchResponseData.AbortedTransaction abortedTransaction = new FetchResponseData.AbortedTransaction(); + abortedTransaction.setFirstOffset(0); + abortedTransaction.setProducerId(PRODUCER_ID); + return Collections.singletonList(abortedTransaction); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java new file mode 100644 index 00000000000..dc00ce33366 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.MetricNameTemplate; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags; +import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class FetchMetricsManagerTest { + + private static final double EPSILON = 0.0001; + + private final Time time = new MockTime(1, 0, 0); + private final static String TOPIC_NAME = "test"; + private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0); + + private Metrics metrics; + private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; + + + @BeforeEach + public void setup() { + metrics = new Metrics(time); + metricsRegistry = new FetchMetricsRegistry(metrics.config().tags().keySet(), "test"); + metricsManager = new FetchMetricsManager(metrics, metricsRegistry); + } + + @AfterEach + public void tearDown() { + if (metrics != null) { + metrics.close(); + metrics = null; + } + + metricsManager = null; + } + + @Test + public void testLatency() { + metricsManager.recordLatency(123); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordLatency(456); + + assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), EPSILON); + assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), EPSILON); + } + + @Test + public void testBytesFetched() { + metricsManager.recordBytesFetched(2); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordBytesFetched(10); + + assertEquals(6, metricValue(metricsRegistry.fetchSizeAvg), EPSILON); + assertEquals(10, metricValue(metricsRegistry.fetchSizeMax), EPSILON); + } + + @Test + public void testBytesFetchedTopic() { + String topicName1 = TOPIC_NAME; + String topicName2 = "another-topic"; + Map<String, String> tags1 = topicTags(topicName1); + Map<String, String> tags2 = topicTags(topicName2); + + metricsManager.recordBytesFetched(topicName1, 2); + metricsManager.recordBytesFetched(topicName2, 1); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordBytesFetched(topicName1, 10); + metricsManager.recordBytesFetched(topicName2, 5); + + assertEquals(6, metricValue(metricsRegistry.topicFetchSizeAvg, tags1), EPSILON); + assertEquals(10, metricValue(metricsRegistry.topicFetchSizeMax, tags1), EPSILON); + assertEquals(3, metricValue(metricsRegistry.topicFetchSizeAvg, tags2), EPSILON); + assertEquals(5, metricValue(metricsRegistry.topicFetchSizeMax, tags2), EPSILON); + } + + @Test + public void testRecordsFetched() { + metricsManager.recordRecordsFetched(3); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordRecordsFetched(15); + + assertEquals(9, metricValue(metricsRegistry.recordsPerRequestAvg), EPSILON); + } + + @Test + public void testRecordsFetchedTopic() { + String topicName1 = TOPIC_NAME; + String topicName2 = "another-topic"; + Map<String, String> tags1 = topicTags(topicName1); + Map<String, String> tags2 = topicTags(topicName2); + + metricsManager.recordRecordsFetched(topicName1, 2); + metricsManager.recordRecordsFetched(topicName2, 1); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordRecordsFetched(topicName1, 10); + metricsManager.recordRecordsFetched(topicName2, 5); + + assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg, tags1), EPSILON); + assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, tags2), EPSILON); + } + + @Test + public void testPartitionLag() { + Map<String, String> tags = topicPartitionTags(TP); + metricsManager.recordPartitionLag(TP, 14); + metricsManager.recordPartitionLag(TP, 8); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordPartitionLag(TP, 5); + + assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON); + assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, tags), EPSILON); + assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, tags), EPSILON); + assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, tags), EPSILON); + } + + @Test + public void testPartitionLead() { + Map<String, String> tags = topicPartitionTags(TP); + metricsManager.recordPartitionLead(TP, 15); + metricsManager.recordPartitionLead(TP, 11); + time.sleep(metrics.config().timeWindowMs() + 1); + metricsManager.recordPartitionLead(TP, 13); + + assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON); + assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, tags), EPSILON); + assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, tags), EPSILON); + assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, tags), EPSILON); + } + + private double metricValue(MetricNameTemplate name) { + MetricName metricName = metrics.metricInstance(name); + KafkaMetric metric = metrics.metric(metricName); + return (Double) metric.metricValue(); + } + + private double metricValue(MetricNameTemplate name, Map<String, String> tags) { + MetricName metricName = metrics.metricInstance(name, tags); + KafkaMetric metric = metrics.metric(metricName); + return (Double) metric.metricValue(); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a22739e1413..e60edbfb6c1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -175,7 +175,7 @@ public class FetcherTest { private MockTime time = new MockTime(1); private SubscriptionState subscriptions; private ConsumerMetadata metadata; - private FetcherMetricsRegistry metricsRegistry; + private FetchMetricsRegistry metricsRegistry; private MockClient client; private Metrics metrics; private ApiVersions apiVersions = new ApiVersions(); @@ -3686,7 +3686,7 @@ public class FetcherTest { metrics = new Metrics(metricConfig, time); consumerClient = spy(new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE)); - metricsRegistry = new FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId); + metricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId); } private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index d57acc243e1..ffaeab4709d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -1244,7 +1244,7 @@ public class OffsetFetcherTest { buildFetcher(metricConfig, isolationLevel, metadataExpireMs, subscriptionState, logContext); - FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group"); + FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group"); Fetcher<byte[], byte[]> fetcher = new Fetcher<>( logContext, consumerClient,