This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a3252629a37 KAFKA-14365: Extract common logic from Fetcher (#13425) a3252629a37 is described below commit a3252629a37344be1f1f86a77cf685f9c147be4d Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Mar 24 14:33:13 2023 -0700 KAFKA-14365: Extract common logic from Fetcher (#13425) * KAFKA-14365: Extract common logic from Fetcher Extract logic from Fetcher into AbstractFetcher. Also introduce FetchConfig as a more concise way to delineate state from incoming configuration. Formalized the defaults in CommonClientConfigs and ConsumerConfig to be accessible elsewhere. * Removed overridden methods in favor of synchronizing where needed Reviewers: Guozhang Wang <wangg...@gmail.com> --- checkstyle/suppressions.xml | 14 +- .../apache/kafka/clients/CommonClientConfigs.java | 1 + .../kafka/clients/consumer/ConsumerConfig.java | 14 +- .../kafka/clients/consumer/KafkaConsumer.java | 25 +- .../clients/consumer/internals/AbstractFetch.java | 791 +++++++++++++++++++++ .../clients/consumer/internals/CompletedFetch.java | 25 +- .../clients/consumer/internals/FetchConfig.java | 124 ++++ .../consumer/internals/FetchMetricsManager.java | 15 +- .../kafka/clients/consumer/internals/Fetcher.java | 727 +------------------ .../kafka/clients/consumer/KafkaConsumerTest.java | 21 +- .../consumer/internals/CompletedFetchTest.java | 34 +- .../clients/consumer/internals/FetcherTest.java | 54 +- .../consumer/internals/OffsetFetcherTest.java | 18 +- 13 files changed, 1042 insertions(+), 821 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 93ae7c30d0f..5c95c8d4284 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -46,7 +46,7 @@ <suppress id="dontUseSystemExit" files="Exit.java"/> <suppress checks="ClassFanOutComplexity" - files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/> + files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/> <suppress checks="ClassFanOutComplexity" files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/> <suppress checks="NPath" @@ -67,8 +67,6 @@ files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/> <suppress checks="ParameterNumber" files="(KafkaConsumer|ConsumerCoordinator).java"/> - <suppress checks="ParameterNumber" - files="Fetcher.java"/> <suppress checks="ParameterNumber" files="Sender.java"/> <suppress checks="ParameterNumber" @@ -79,7 +77,7 @@ files="MemoryRecordsBuilder.java"/> <suppress checks="ClassDataAbstractionCoupling" - files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/> + files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/> <suppress checks="ClassDataAbstractionCoupling" files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/> @@ -87,13 +85,13 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> <suppress checks="CyclomaticComplexity" - files="(ConsumerCoordinator|Fetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/> + files="(AbstractFetch|ConsumerCoordinator|OffsetFetcher|KafkaProducer|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/> <suppress checks="JavaNCSS" files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/> <suppress checks="NPathComplexity" - files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/> + files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/> <suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)" files="CoordinatorClient.java"/> @@ -108,7 +106,7 @@ <!-- Clients tests --> <suppress checks="ClassDataAbstractionCoupling" - files="(Sender|Fetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/> + files="(Sender|Fetcher|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/> <suppress checks="ClassFanOutComplexity" files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient|Message|KafkaProducer)Test.java"/> @@ -117,7 +115,7 @@ files="MockAdminClient.java"/> <suppress checks="CyclomaticComplexity" - files="RequestResponseTest.java"/> + files="(OffsetFetcher|RequestResponse)Test.java"/> <suppress checks="JavaNCSS" files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index d88aa0a6a1c..ee190df50e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -74,6 +74,7 @@ public class CommonClientConfigs { public static final String CLIENT_RACK_CONFIG = "client.rack"; public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'"; + public static final String DEFAULT_CLIENT_RACK = ""; public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker."; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index f8bc97ec8a3..51c5a35bcf8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -88,6 +88,7 @@ public class ConsumerConfig extends AbstractConfig { private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()." + " Note, that <code>" + MAX_POLL_RECORDS_CONFIG + "</code> does not impact the underlying fetching behavior." + " The consumer will cache the records from each fetch request and returns them incrementally from each poll."; + public static final int DEFAULT_MAX_POLL_RECORDS = 500; /** <code>max.poll.interval.ms</code> */ public static final String MAX_POLL_INTERVAL_MS_CONFIG = CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; @@ -154,7 +155,8 @@ public class ConsumerConfig extends AbstractConfig { * <code>fetch.min.bytes</code> */ public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; - private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait [...] + public static final int DEFAULT_FETCH_MIN_BYTES = 1; + private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of " + DEFAULT_FETCH_MIN_BYTES + " byte means that fetch requests are answered as soon as that many byte(s) of data is available or the fetch request times out waiting for data to arrive. Setting this to a larger value will c [...] /** * <code>fetch.max.bytes</code> @@ -172,6 +174,7 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; + public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** <code>metadata.max.age.ms</code> */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; @@ -203,6 +206,7 @@ public class ConsumerConfig extends AbstractConfig { * <code>client.rack</code> */ public static final String CLIENT_RACK_CONFIG = CommonClientConfigs.CLIENT_RACK_CONFIG; + public static final String DEFAULT_CLIENT_RACK = CommonClientConfigs.DEFAULT_CLIENT_RACK; /** * <code>reconnect.backoff.ms</code> @@ -402,7 +406,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.CLIENT_ID_DOC) .define(CLIENT_RACK_CONFIG, Type.STRING, - "", + DEFAULT_CLIENT_RACK, Importance.LOW, CommonClientConfigs.CLIENT_RACK_DOC) .define(MAX_PARTITION_FETCH_BYTES_CONFIG, @@ -425,7 +429,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(FETCH_MIN_BYTES_CONFIG, Type.INT, - 1, + DEFAULT_FETCH_MIN_BYTES, atLeast(0), Importance.HIGH, FETCH_MIN_BYTES_DOC) @@ -437,7 +441,7 @@ public class ConsumerConfig extends AbstractConfig { FETCH_MAX_BYTES_DOC) .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, - 500, + DEFAULT_FETCH_MAX_WAIT_MS, atLeast(0), Importance.LOW, FETCH_MAX_WAIT_MS_DOC) @@ -543,7 +547,7 @@ public class ConsumerConfig extends AbstractConfig { INTERCEPTOR_CLASSES_DOC) .define(MAX_POLL_RECORDS_CONFIG, Type.INT, - 500, + DEFAULT_MAX_POLL_RECORDS, atLeast(1), Importance.MEDIUM, MAX_POLL_RECORDS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 11ac675cb4b..723cc00ca2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -27,6 +27,8 @@ import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.Fetch; +import org.apache.kafka.clients.consumer.internals.FetchConfig; +import org.apache.kafka.clients.consumer.internals.FetchMetricsManager; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry; import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics; @@ -590,7 +592,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final long requestTimeoutMs; private final int defaultApiTimeoutMs; private volatile boolean closed = false; - private List<ConsumerPartitionAssignor> assignors; + private final List<ConsumerPartitionAssignor> assignors; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -738,10 +740,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { String metricGrpPrefix = "consumer"; FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); + FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(metrics, metricsRegistry); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); this.isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); - Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry); int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); ApiVersions apiVersions = new ApiVersions(); @@ -760,7 +762,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { time, true, apiVersions, - throttleTimeSensor, + fetchMetricsManager.throttleTimeSensor(), logContext); this.client = new ConsumerNetworkClient( logContext, @@ -797,24 +799,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), config.getString(ConsumerConfig.CLIENT_RACK_CONFIG)); } + FetchConfig<K, V> fetchConfig = new FetchConfig<>(config, keyDeserializer, valueDeserializer, isolationLevel); this.fetcher = new Fetcher<>( logContext, this.client, - config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), - config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), - config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), - config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), - config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), - config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), - config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), - this.keyDeserializer, - this.valueDeserializer, this.metadata, this.subscriptions, - metrics, - metricsRegistry, - this.time, - isolationLevel); + fetchConfig, + fetchMetricsManager, + this.time); this.offsetFetcher = new OffsetFetcher(logContext, client, metadata, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java new file mode 100644 index 00000000000..13a45fc4569 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.helpers.MessageFormatter; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * {@code AbstractFetch} represents the basic state and logic for record fetching processing. + * @param <K> Type for the message key + * @param <V> Type for the message value + */ +public abstract class AbstractFetch<K, V> implements Closeable { + + private final Logger log; + protected final LogContext logContext; + protected final ConsumerNetworkClient client; + protected final ConsumerMetadata metadata; + protected final SubscriptionState subscriptions; + protected final FetchConfig<K, V> fetchConfig; + protected final Time time; + protected final FetchMetricsManager metricsManager; + + private final BufferSupplier decompressionBufferSupplier; + private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches; + private final Map<Integer, FetchSessionHandler> sessionHandlers; + private final Set<Integer> nodesWithPendingFetchRequests; + + private CompletedFetch<K, V> nextInLineFetch; + + public AbstractFetch(final LogContext logContext, + final ConsumerNetworkClient client, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig<K, V> fetchConfig, + final FetchMetricsManager metricsManager, + final Time time) { + this.log = logContext.logger(AbstractFetch.class); + this.logContext = logContext; + this.client = client; + this.metadata = metadata; + this.subscriptions = subscriptions; + this.fetchConfig = fetchConfig; + this.decompressionBufferSupplier = BufferSupplier.create(); + this.completedFetches = new ConcurrentLinkedQueue<>(); + this.sessionHandlers = new HashMap<>(); + this.nodesWithPendingFetchRequests = new HashSet<>(); + this.metricsManager = metricsManager; + this.time = time; + } + + /** + * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has + * visibility for testing. + * + * @return true if there are completed fetches, false otherwise + */ + boolean hasCompletedFetches() { + return !completedFetches.isEmpty(); + } + + /** + * Return whether we have any completed fetches that are fetchable. This method is thread-safe. + * @return true if there are completed fetches that can be returned, false otherwise + */ + public boolean hasAvailableFetches() { + return completedFetches.stream().anyMatch(fetch -> subscriptions.isFetchable(fetch.partition)); + } + + /** + * Implements the core logic for a successful fetch request/response. + * + * @param fetchTarget {@link Node} from which the fetch data was requested + * @param data {@link FetchSessionHandler.FetchRequestData} that represents the session data + * @param resp {@link ClientResponse} from which the {@link FetchResponse} will be retrieved + */ + protected void handleFetchResponse(final Node fetchTarget, + final FetchSessionHandler.FetchRequestData data, + final ClientResponse resp) { + try { + final FetchResponse response = (FetchResponse) resp.responseBody(); + final FetchSessionHandler handler = sessionHandler(fetchTarget.id()); + + if (handler == null) { + log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", + fetchTarget.id()); + return; + } + + final short requestVersion = resp.requestHeader().apiVersion(); + + if (!handler.handleResponse(response, requestVersion)) { + if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { + metadata.requestUpdate(); + } + + return; + } + + final Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), requestVersion); + final Set<TopicPartition> partitions = new HashSet<>(responseData.keySet()); + final FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions); + + for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) { + TopicPartition partition = entry.getKey(); + FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); + + if (requestData == null) { + String message; + + if (data.metadata().isFull()) { + message = MessageFormatter.arrayFormat( + "Response for missing full request partition: partition={}; metadata={}", + new Object[]{partition, data.metadata()}).getMessage(); + } else { + message = MessageFormatter.arrayFormat( + "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", + new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage(); + } + + // Received fetch response for missing session partition + throw new IllegalStateException(message); + } + + long fetchOffset = requestData.fetchOffset; + FetchResponseData.PartitionData partitionData = entry.getValue(); + + log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", + fetchConfig.isolationLevel, fetchOffset, partition, partitionData); + + CompletedFetch<K, V> completedFetch = new CompletedFetch<>( + logContext, + subscriptions, + fetchConfig, + decompressionBufferSupplier, + partition, + partitionData, + metricAggregator, + fetchOffset, + requestVersion); + completedFetches.add(completedFetch); + } + + metricsManager.recordLatency(resp.requestLatencyMs()); + } finally { + log.debug("Removing pending request for node {}", fetchTarget); + nodesWithPendingFetchRequests.remove(fetchTarget.id()); + } + } + + /** + * Implements the core logic for a failed fetch request/response. + * + * @param fetchTarget {@link Node} from which the fetch data was requested + * @param e {@link RuntimeException} representing the error that resulted in the failure + */ + protected void handleFetchResponse(final Node fetchTarget, final RuntimeException e) { + try { + final FetchSessionHandler handler = sessionHandler(fetchTarget.id()); + + if (handler != null) { + handler.handleError(e); + handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica); + } + } finally { + log.debug("Removing pending request for node {}", fetchTarget); + nodesWithPendingFetchRequests.remove(fetchTarget.id()); + } + } + + /** + * Creates a new {@link FetchRequest fetch request} in preparation for sending to the Kafka cluster. + * + * @param fetchTarget {@link Node} from which the fetch data will be requested + * @param requestData {@link FetchSessionHandler.FetchRequestData} that represents the session data + * @return {@link FetchRequest.Builder} that can be submitted to the broker + */ + protected FetchRequest.Builder createFetchRequest(final Node fetchTarget, + final FetchSessionHandler.FetchRequestData requestData) { + // Version 12 is the maximum version that could be used without topic IDs. See FetchRequest.json for schema + // changelog. + final short maxVersion = requestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12; + + final FetchRequest.Builder request = FetchRequest.Builder + .forConsumer(maxVersion, fetchConfig.maxWaitMs, fetchConfig.minBytes, requestData.toSend()) + .isolationLevel(fetchConfig.isolationLevel) + .setMaxBytes(fetchConfig.maxBytes) + .metadata(requestData.metadata()) + .removed(requestData.toForget()) + .replaced(requestData.toReplace()) + .rackId(fetchConfig.clientRackId); + + log.debug("Sending {} {} to broker {}", fetchConfig.isolationLevel, requestData, fetchTarget); + + // We add the node to the set of nodes with pending fetch requests before adding the + // listener because the future may have been fulfilled on another thread (e.g. during a + // disconnection being handled by the heartbeat thread) which will mean the listener + // will be invoked synchronously. + log.debug("Adding pending request for node {}", fetchTarget); + nodesWithPendingFetchRequests.add(fetchTarget.id()); + + return request; + } + + /** + * Return the fetched records, empty the record buffer and update the consumed position. + * + * </p> + * + * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated. + * + * @return A {@link Fetch} for the requested partitions + * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and + * the defaultResetPolicy is NONE + * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. + */ + public Fetch<K, V> collectFetch() { + Fetch<K, V> fetch = Fetch.empty(); + Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>(); + int recordsRemaining = fetchConfig.maxPollRecords; + + try { + while (recordsRemaining > 0) { + if (nextInLineFetch == null || nextInLineFetch.isConsumed) { + CompletedFetch<K, V> records = completedFetches.peek(); + if (records == null) break; + + if (!records.initialized) { + try { + nextInLineFetch = initializeCompletedFetch(records); + } catch (Exception e) { + // Remove a completedFetch upon a parse with exception if (1) it contains no records, and + // (2) there are no fetched records with actual content preceding this exception. + // The first condition ensures that the completedFetches is not stuck with the same completedFetch + // in cases such as the TopicAuthorizationException, and the second condition ensures that no + // potential data loss due to an exception in a following record. + if (fetch.isEmpty() && FetchResponse.recordsOrFail(records.partitionData).sizeInBytes() == 0) { + completedFetches.poll(); + } + throw e; + } + } else { + nextInLineFetch = records; + } + completedFetches.poll(); + } else if (subscriptions.isPaused(nextInLineFetch.partition)) { + // when the partition is paused we add the records back to the completedFetches queue instead of draining + // them so that they can be returned on a subsequent poll if the partition is resumed at that time + log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition); + pausedCompletedFetches.add(nextInLineFetch); + nextInLineFetch = null; + } else { + Fetch<K, V> nextFetch = fetchRecords(recordsRemaining); + recordsRemaining -= nextFetch.numRecords(); + fetch.add(nextFetch); + } + } + } catch (KafkaException e) { + if (fetch.isEmpty()) + throw e; + } finally { + // add any polled completed fetches for paused partitions back to the completed fetches queue to be + // re-evaluated in the next poll + completedFetches.addAll(pausedCompletedFetches); + } + + return fetch; + } + + private Fetch<K, V> fetchRecords(final int maxRecords) { + if (!subscriptions.isAssigned(nextInLineFetch.partition)) { + // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition {} since it is no longer assigned", + nextInLineFetch.partition); + } else if (!subscriptions.isFetchable(nextInLineFetch.partition)) { + // this can happen when a partition is paused before fetched records are returned to the consumer's + // poll call or if the offset is being reset + log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", + nextInLineFetch.partition); + } else { + SubscriptionState.FetchPosition position = subscriptions.position(nextInLineFetch.partition); + if (position == null) { + throw new IllegalStateException("Missing position for fetchable partition " + nextInLineFetch.partition); + } + + if (nextInLineFetch.nextFetchOffset == position.offset) { + List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(maxRecords); + + log.trace("Returning {} fetched records at offset {} for assigned partition {}", + partRecords.size(), position, nextInLineFetch.partition); + + boolean positionAdvanced = false; + + if (nextInLineFetch.nextFetchOffset > position.offset) { + SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition( + nextInLineFetch.nextFetchOffset, + nextInLineFetch.lastEpoch, + position.currentLeader); + log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", + position, nextPosition, nextInLineFetch.partition, partRecords.size()); + subscriptions.position(nextInLineFetch.partition, nextPosition); + positionAdvanced = true; + } + + Long partitionLag = subscriptions.partitionLag(nextInLineFetch.partition, fetchConfig.isolationLevel); + if (partitionLag != null) + metricsManager.recordPartitionLag(nextInLineFetch.partition, partitionLag); + + Long lead = subscriptions.partitionLead(nextInLineFetch.partition); + if (lead != null) { + metricsManager.recordPartitionLead(nextInLineFetch.partition, lead); + } + + return Fetch.forPartition(nextInLineFetch.partition, partRecords, positionAdvanced); + } else { + // these records aren't next in line based on the last consumed position, ignore them + // they must be from an obsolete request + log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", + nextInLineFetch.partition, nextInLineFetch.nextFetchOffset, position); + } + } + + log.trace("Draining fetched records for partition {}", nextInLineFetch.partition); + nextInLineFetch.drain(); + + return Fetch.empty(); + } + + private List<TopicPartition> fetchablePartitions() { + Set<TopicPartition> exclude = new HashSet<>(); + if (nextInLineFetch != null && !nextInLineFetch.isConsumed) { + exclude.add(nextInLineFetch.partition); + } + for (CompletedFetch<K, V> completedFetch : completedFetches) { + exclude.add(completedFetch.partition); + } + return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); + } + + /** + * Determine from which replica to read: the <i>preferred</i> or the <i>leader</i>. The preferred replica is used + * iff: + * + * <ul> + * <li>A preferred replica was previously set</li> + * <li>We're still within the lease time for the preferred replica</li> + * <li>The replica is still online/available</li> + * </ul> + * + * If any of the above are not met, the leader node is returned. + * + * @param partition {@link TopicPartition} for which we want to fetch data + * @param leaderReplica {@link Node} for the leader of the given partition + * @param currentTimeMs Current time in milliseconds; used to determine if we're within the optional lease window + * @return Replic {@link Node node} from which to request the data + * @see SubscriptionState#preferredReadReplica + * @see SubscriptionState#updatePreferredReadReplica + */ + Node selectReadReplica(final TopicPartition partition, final Node leaderReplica, final long currentTimeMs) { + Optional<Integer> nodeId = subscriptions.preferredReadReplica(partition, currentTimeMs); + + if (nodeId.isPresent()) { + Optional<Node> node = nodeId.flatMap(id -> metadata.fetch().nodeIfOnline(partition, id)); + if (node.isPresent()) { + return node.get(); + } else { + log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," + + " using the leader instead.", nodeId, partition); + // Note that this condition may happen due to stale metadata, so we clear preferred replica and + // refresh metadata. + requestMetadataUpdate(partition); + return leaderReplica; + } + } else { + return leaderReplica; + } + } + + /** + * Create fetch requests for all nodes for which we have assigned partitions + * that have no existing requests in flight. + */ + protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { + // Update metrics in case there was an assignment change + metricsManager.maybeUpdateAssignment(subscriptions); + + Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>(); + long currentTimeMs = time.milliseconds(); + Map<String, Uuid> topicIds = metadata.topicIds(); + + for (TopicPartition partition : fetchablePartitions()) { + SubscriptionState.FetchPosition position = subscriptions.position(partition); + + if (position == null) + throw new IllegalStateException("Missing position for fetchable partition " + partition); + + Optional<Node> leaderOpt = position.currentLeader.leader; + + if (!leaderOpt.isPresent()) { + log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); + metadata.requestUpdate(); + continue; + } + + // Use the preferred read replica if set, otherwise the partition's leader + Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs); + + if (client.isUnavailable(node)) { + client.maybeThrowAuthFailure(node); + + // If we try to send during the reconnect backoff window, then the request is just + // going to be failed anyway before being sent, so skip sending the request for now + log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); + } else if (nodesWithPendingFetchRequests.contains(node.id())) { + log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); + } else { + // if there is a leader and no in-flight requests, issue a new fetch + FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> { + FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n)); + return fetchSessionHandler.newBuilder(); + }); + Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID); + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId, + position.offset, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchConfig.fetchSize, + position.currentLeader.epoch, + Optional.empty()); + builder.add(partition, partitionData); + + log.debug("Added {} fetch request for partition {} at position {} to node {}", fetchConfig.isolationLevel, + partition, position, node); + } + } + + Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>(); + for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) { + reqs.put(entry.getKey(), entry.getValue().build()); + } + return reqs; + } + + /** + * Initialize a CompletedFetch object. + */ + private CompletedFetch<K, V> initializeCompletedFetch(final CompletedFetch<K, V> completedFetch) { + final TopicPartition tp = completedFetch.partition; + final Errors error = Errors.forCode(completedFetch.partitionData.errorCode()); + boolean recordMetrics = true; + + try { + if (!subscriptions.hasValidPosition(tp)) { + // this can happen when a rebalance happened while fetch is still in-flight + log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); + return null; + } else if (error == Errors.NONE) { + final CompletedFetch<K, V> ret = handleInitializeCompletedFetchSuccess(completedFetch); + recordMetrics = ret == null; + return ret; + } else { + handleInitializeCompletedFetchErrors(completedFetch, error); + return null; + } + } finally { + if (recordMetrics) { + completedFetch.recordAggregatedMetrics(0, 0); + } + + if (error != Errors.NONE) + // we move the partition to the end if there was an error. This way, it's more likely that partitions for + // the same topic can remain together (allowing for more efficient serialization). + subscriptions.movePartitionToEnd(tp); + } + } + + private CompletedFetch<K, V> handleInitializeCompletedFetchSuccess(final CompletedFetch<K, V> completedFetch) { + final TopicPartition tp = completedFetch.partition; + final long fetchOffset = completedFetch.nextFetchOffset; + + // we are interested in this fetch only if the beginning offset matches the + // current consumed position + SubscriptionState.FetchPosition position = subscriptions.position(tp); + if (position == null || position.offset != fetchOffset) { + log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + + "the expected offset {}", tp, fetchOffset, position); + return null; + } + + final FetchResponseData.PartitionData partition = completedFetch.partitionData; + log.trace("Preparing to read {} bytes of data for partition {} with offset {}", + FetchResponse.recordsSize(partition), tp, position); + Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator(); + + if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) { + if (completedFetch.requestVersion < 3) { + // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. + Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); + throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + + recordTooLargePartitions + " whose size is larger than the fetch size " + fetchConfig.fetchSize + + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " + + "newer to avoid this issue. Alternately, increase the fetch size on the client (using " + + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", + recordTooLargePartitions); + } else { + // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74) + throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + + fetchOffset + ". Received a non-empty fetch response from the server, but no " + + "complete records were found."); + } + } + + if (partition.highWatermark() >= 0) { + log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); + subscriptions.updateHighWatermark(tp, partition.highWatermark()); + } + + if (partition.logStartOffset() >= 0) { + log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset()); + subscriptions.updateLogStartOffset(tp, partition.logStartOffset()); + } + + if (partition.lastStableOffset() >= 0) { + log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset()); + subscriptions.updateLastStableOffset(tp, partition.lastStableOffset()); + } + + if (FetchResponse.isPreferredReplica(partition)) { + subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> { + long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs(); + log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", + tp, partition.preferredReadReplica(), expireTimeMs); + return expireTimeMs; + }); + } + + completedFetch.initialized = true; + return completedFetch; + } + + private void handleInitializeCompletedFetchErrors(final CompletedFetch<K, V> completedFetch, + final Errors error) { + final TopicPartition tp = completedFetch.partition; + final long fetchOffset = completedFetch.nextFetchOffset; + + if (error == Errors.NOT_LEADER_OR_FOLLOWER || + error == Errors.REPLICA_NOT_AVAILABLE || + error == Errors.KAFKA_STORAGE_ERROR || + error == Errors.FENCED_LEADER_EPOCH || + error == Errors.OFFSET_NOT_AVAILABLE) { + log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); + requestMetadataUpdate(tp); + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + log.warn("Received unknown topic or partition error in fetch for partition {}", tp); + requestMetadataUpdate(tp); + } else if (error == Errors.UNKNOWN_TOPIC_ID) { + log.warn("Received unknown topic ID error in fetch for partition {}", tp); + requestMetadataUpdate(tp); + } else if (error == Errors.INCONSISTENT_TOPIC_ID) { + log.warn("Received inconsistent topic ID error in fetch for partition {}", tp); + requestMetadataUpdate(tp); + } else if (error == Errors.OFFSET_OUT_OF_RANGE) { + Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); + + if (!clearedReplicaId.isPresent()) { + // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally + SubscriptionState.FetchPosition position = subscriptions.position(tp); + + if (position == null || fetchOffset != position.offset) { + log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + + "does not match the current offset {}", tp, fetchOffset, position); + } else { + handleOffsetOutOfRange(position, tp); + } + } else { + log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", + clearedReplicaId.get(), tp, error, fetchOffset); + } + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters + log.warn("Not authorized to read from partition {}.", tp); + throw new TopicAuthorizationException(Collections.singleton(tp.topic())); + } else if (error == Errors.UNKNOWN_LEADER_EPOCH) { + log.debug("Received unknown leader epoch error in fetch for partition {}", tp); + } else if (error == Errors.UNKNOWN_SERVER_ERROR) { + log.warn("Unknown server error while fetching offset {} for topic-partition {}", + fetchOffset, tp); + } else if (error == Errors.CORRUPT_MESSAGE) { + throw new KafkaException("Encountered corrupt message when fetching offset " + + fetchOffset + + " for topic-partition " + + tp); + } else { + throw new IllegalStateException("Unexpected error code " + + error.code() + + " while fetching at offset " + + fetchOffset + + " from topic-partition " + tp); + } + } + + private void handleOffsetOutOfRange(final SubscriptionState.FetchPosition fetchPosition, + final TopicPartition topicPartition) { + String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition; + + if (subscriptions.hasDefaultOffsetResetPolicy()) { + log.info("{}, resetting offset", errorMessage); + subscriptions.requestOffsetReset(topicPartition); + } else { + log.info("{}, raising error to the application since no reset policy is configured", errorMessage); + throw new OffsetOutOfRangeException(errorMessage, + Collections.singletonMap(topicPartition, fetchPosition.offset)); + } + } + + /** + * Clear the buffered data which are not a part of newly assigned partitions. Any previously + * {@link CompletedFetch fetched data} is dropped if it is for a partition that is no longer in + * {@code assignedPartitions}. + * + * @param assignedPartitions Newly-assigned {@link TopicPartition} + */ + public void clearBufferedDataForUnassignedPartitions(final Collection<TopicPartition> assignedPartitions) { + final Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator(); + + while (completedFetchesItr.hasNext()) { + final CompletedFetch<K, V> completedFetch = completedFetchesItr.next(); + final TopicPartition tp = completedFetch.partition; + + if (!assignedPartitions.contains(tp)) { + log.debug("Removing {} from buffered data as it is no longer an assigned partition", tp); + completedFetch.drain(); + completedFetchesItr.remove(); + } + } + + if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) { + nextInLineFetch.drain(); + nextInLineFetch = null; + } + } + + /** + * Clear the buffered data which are not a part of newly assigned topics + * + * @param assignedTopics newly assigned topics + */ + public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) { + final Set<TopicPartition> currentTopicPartitions = new HashSet<>(); + + for (TopicPartition tp : subscriptions.assignedPartitions()) { + if (assignedTopics.contains(tp.topic())) { + currentTopicPartitions.add(tp); + } + } + + clearBufferedDataForUnassignedPartitions(currentTopicPartitions); + } + + protected FetchSessionHandler sessionHandler(int node) { + return sessionHandlers.get(node); + } + + // Visible for testing + void maybeCloseFetchSessions(final Timer timer) { + final Cluster cluster = metadata.fetch(); + final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); + + sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { + // set the session handler to notify close. This will set the next metadata request to send close message. + sessionHandler.notifyClose(); + + final int sessionId = sessionHandler.sessionId(); + // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will + // skip sending the close request. + final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); + if (fetchTarget == null || client.isUnavailable(fetchTarget)) { + log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); + return; + } + + final FetchRequest.Builder request = createFetchRequest(fetchTarget, sessionHandler.newBuilder().build()); + final RequestFuture<ClientResponse> responseFuture = client.send(fetchTarget, request); + + responseFuture.addListener(new RequestFutureListener<ClientResponse>() { + @Override + public void onSuccess(ClientResponse value) { + log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); + } + + @Override + public void onFailure(RuntimeException e) { + log.debug("Unable to a close message for fetch session: {} to node: {}. " + + "This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); + } + }); + + requestFutures.add(responseFuture); + }); + + // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until + // all requests have received a response. + while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) { + client.poll(timer, null, true); + } + + if (!requestFutures.stream().allMatch(RequestFuture::isDone)) { + // we ran out of time before completing all futures. It is ok since we don't want to block the shutdown + // here. + log.debug("All requests couldn't be sent in the specific timeout period {}ms. " + + "This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + + "KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); + } + } + + public void close(final Timer timer) { + // we do not need to re-enable wakeups since we are closing already + client.disableWakeups(); + + if (nextInLineFetch != null) { + nextInLineFetch.drain(); + nextInLineFetch = null; + } + + maybeCloseFetchSessions(timer); + Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier"); + sessionHandlers.clear(); + } + + @Override + public void close() { + close(time.timer(0)); + } + + private void requestMetadataUpdate(final TopicPartition topicPartition) { + metadata.requestUpdate(); + subscriptions.clearPreferredReadReplica(topicPartition); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java index 6a11b846810..a67516ecc73 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java @@ -71,11 +71,8 @@ class CompletedFetch<K, V> { private final Logger log; private final SubscriptionState subscriptions; - private final boolean checkCrcs; + private final FetchConfig<K, V> fetchConfig; private final BufferSupplier decompressionBufferSupplier; - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; - private final IsolationLevel isolationLevel; private final Iterator<? extends RecordBatch> batches; private final Set<Long> abortedProducerIds; private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions; @@ -91,11 +88,8 @@ class CompletedFetch<K, V> { CompletedFetch(LogContext logContext, SubscriptionState subscriptions, - boolean checkCrcs, + FetchConfig<K, V> fetchConfig, BufferSupplier decompressionBufferSupplier, - Deserializer<K> keyDeserializer, - Deserializer<V> valueDeserializer, - IsolationLevel isolationLevel, TopicPartition partition, FetchResponseData.PartitionData partitionData, FetchMetricsAggregator metricAggregator, @@ -103,11 +97,8 @@ class CompletedFetch<K, V> { short requestVersion) { this.log = logContext.logger(CompletedFetch.class); this.subscriptions = subscriptions; - this.checkCrcs = checkCrcs; + this.fetchConfig = fetchConfig; this.decompressionBufferSupplier = decompressionBufferSupplier; - this.keyDeserializer = keyDeserializer; - this.valueDeserializer = valueDeserializer; - this.isolationLevel = isolationLevel; this.partition = partition; this.partitionData = partitionData; this.metricAggregator = metricAggregator; @@ -147,7 +138,7 @@ class CompletedFetch<K, V> { } private void maybeEnsureValid(RecordBatch batch) { - if (checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { + if (fetchConfig.checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) { try { batch.ensureValid(); } catch (CorruptRecordException e) { @@ -158,7 +149,7 @@ class CompletedFetch<K, V> { } private void maybeEnsureValid(Record record) { - if (checkCrcs) { + if (fetchConfig.checkCrcs) { try { record.ensureValid(); } catch (CorruptRecordException e) { @@ -196,7 +187,7 @@ class CompletedFetch<K, V> { lastEpoch = maybeLeaderEpoch(currentBatch.partitionLeaderEpoch()); maybeEnsureValid(currentBatch); - if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { + if (fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { // remove from the aborted transaction queue all aborted transactions which have begun // before the current batch's last offset and add the associated producerIds to the // aborted producer set @@ -306,10 +297,10 @@ class CompletedFetch<K, V> { Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : org.apache.kafka.common.utils.Utils.toArray(keyBytes); - K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); + K key = keyBytes == null ? null : fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); - V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); + V value = valueBytes == null ? null : fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java new file mode 100644 index 00000000000..1557ce980b4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchConfig.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * {@link FetchConfig} represents the static configuration for fetching records from Kafka. It is simply a way + * to bundle the immutable settings that were presented at the time the {@link Consumer} was created for later use by + * classes like {@link Fetcher}, {@link CompletedFetch}, etc. + * + * <p/> + * + * In most cases, the values stored and returned by {@link FetchConfig} will be those stored in the following + * {@link ConsumerConfig consumer configuration} settings: + * + * <ul> + * <li>{@link #minBytes}: {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG}</li> + * <li>{@link #maxBytes}: {@link ConsumerConfig#FETCH_MAX_BYTES_CONFIG}</li> + * <li>{@link #maxWaitMs}: {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}</li> + * <li>{@link #fetchSize}: {@link ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG}</li> + * <li>{@link #maxPollRecords}: {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}</li> + * <li>{@link #checkCrcs}: {@link ConsumerConfig#CHECK_CRCS_CONFIG}</li> + * <li>{@link #clientRackId}: {@link ConsumerConfig#CLIENT_RACK_CONFIG}</li> + * <li>{@link #keyDeserializer}: {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG}</li> + * <li>{@link #valueDeserializer}: {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}</li> + * <li>{@link #isolationLevel}: {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG}</li> + * </ul> + * + * However, there are places in the code where additional logic is used to determine these fetch-related configuration + * values. In those cases, the values are calculated outside of this class and simply passed in when constructed. + * + * <p/> + * + * Note: the {@link Deserializer deserializers} used for the key and value are not closed by this class. They should be + * closed by the creator of the {@link FetchConfig}. + * + * @param <K> Type used to {@link Deserializer deserialize} the message/record key + * @param <V> Type used to {@link Deserializer deserialize} the message/record value + */ +public class FetchConfig<K, V> { + + final int minBytes; + final int maxBytes; + final int maxWaitMs; + final int fetchSize; + final int maxPollRecords; + final boolean checkCrcs; + final String clientRackId; + final Deserializer<K> keyDeserializer; + final Deserializer<V> valueDeserializer; + final IsolationLevel isolationLevel; + + public FetchConfig(int minBytes, + int maxBytes, + int maxWaitMs, + int fetchSize, + int maxPollRecords, + boolean checkCrcs, + String clientRackId, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer, + IsolationLevel isolationLevel) { + this.minBytes = minBytes; + this.maxBytes = maxBytes; + this.maxWaitMs = maxWaitMs; + this.fetchSize = fetchSize; + this.maxPollRecords = maxPollRecords; + this.checkCrcs = checkCrcs; + this.clientRackId = clientRackId; + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + this.isolationLevel = isolationLevel; + } + + public FetchConfig(ConsumerConfig config, + Deserializer<K> keyDeserializer, + Deserializer<V> valueDeserializer, + IsolationLevel isolationLevel) { + this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG); + this.maxBytes = config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG); + this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); + this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG); + this.maxPollRecords = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); + this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG); + this.clientRackId = config.getString(ConsumerConfig.CLIENT_RACK_CONFIG); + this.keyDeserializer = keyDeserializer; + this.valueDeserializer = valueDeserializer; + this.isolationLevel = isolationLevel; + } + + @Override + public String toString() { + return "FetchConfig{" + + "minBytes=" + minBytes + + ", maxBytes=" + maxBytes + + ", maxWaitMs=" + maxWaitMs + + ", fetchSize=" + fetchSize + + ", maxPollRecords=" + maxPollRecords + + ", checkCrcs=" + checkCrcs + + ", clientRackId='" + clientRackId + '\'' + + ", keyDeserializer=" + keyDeserializer + + ", valueDeserializer=" + valueDeserializer + + ", isolationLevel=" + isolationLevel + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java index 63bd7650701..49801da3b7c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -33,10 +33,11 @@ import java.util.Set; * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it * records matches up with the topic-partitions in use. */ -class FetchMetricsManager { +public class FetchMetricsManager { private final Metrics metrics; private final FetchMetricsRegistry metricsRegistry; + private final Sensor throttleTime; private final Sensor bytesFetched; private final Sensor recordsFetched; private final Sensor fetchLatency; @@ -46,10 +47,14 @@ class FetchMetricsManager { private int assignmentId = 0; private Set<TopicPartition> assignedPartitions = Collections.emptySet(); - FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) { + public FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) { this.metrics = metrics; this.metricsRegistry = metricsRegistry; + this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time") + .withAvg(metricsRegistry.fetchThrottleTimeAvg) + .withMax(metricsRegistry.fetchThrottleTimeMax) + .build(); this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched") .withAvg(metricsRegistry.fetchSizeAvg) .withMax(metricsRegistry.fetchSizeMax) @@ -72,6 +77,10 @@ class FetchMetricsManager { .build(); } + public Sensor throttleTimeSensor() { + return throttleTime; + } + void recordLatency(long requestLatencyMs) { fetchLatency.record(requestLatencyMs); } @@ -166,7 +175,7 @@ class FetchMetricsManager { } } - static String topicBytesFetchedMetricName(String topic) { + private static String topicBytesFetchedMetricName(String topic) { return "topic." + topic + ".bytes-fetched"; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 29cd7972cc6..c46f0c53cce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -18,52 +18,14 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; -import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.message.FetchResponseData; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.FetchRequest; -import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import org.slf4j.helpers.MessageFormatter; -import java.io.Closeable; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -85,85 +47,20 @@ import java.util.concurrent.atomic.AtomicBoolean; * on a different thread.</li> * </ul> */ -public class Fetcher<K, V> implements Closeable { +public class Fetcher<K, V> extends AbstractFetch<K, V> { + private final Logger log; - private final LogContext logContext; - private final ConsumerNetworkClient client; - private final Time time; - private final int minBytes; - private final int maxBytes; - private final int maxWaitMs; - private final int fetchSize; - private final int maxPollRecords; - private final boolean checkCrcs; - private final String clientRackId; - private final ConsumerMetadata metadata; - private final FetchMetricsManager metricsManager; - private final SubscriptionState subscriptions; - private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches; - private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create(); - private final Deserializer<K> keyDeserializer; - private final Deserializer<V> valueDeserializer; - private final IsolationLevel isolationLevel; - private final Map<Integer, FetchSessionHandler> sessionHandlers; - private final Set<Integer> nodesWithPendingFetchRequests; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private CompletedFetch<K, V> nextInLineFetch = null; public Fetcher(LogContext logContext, ConsumerNetworkClient client, - int minBytes, - int maxBytes, - int maxWaitMs, - int fetchSize, - int maxPollRecords, - boolean checkCrcs, - String clientRackId, - Deserializer<K> keyDeserializer, - Deserializer<V> valueDeserializer, ConsumerMetadata metadata, SubscriptionState subscriptions, - Metrics metrics, - FetchMetricsRegistry metricsRegistry, - Time time, - IsolationLevel isolationLevel) { + FetchConfig<K, V> fetchConfig, + FetchMetricsManager metricsManager, + Time time) { + super(logContext, client, metadata, subscriptions, fetchConfig, metricsManager, time); this.log = logContext.logger(Fetcher.class); - this.logContext = logContext; - this.time = time; - this.client = client; - this.metadata = metadata; - this.subscriptions = subscriptions; - this.minBytes = minBytes; - this.maxBytes = maxBytes; - this.maxWaitMs = maxWaitMs; - this.fetchSize = fetchSize; - this.maxPollRecords = maxPollRecords; - this.checkCrcs = checkCrcs; - this.clientRackId = clientRackId; - this.keyDeserializer = keyDeserializer; - this.valueDeserializer = valueDeserializer; - this.completedFetches = new ConcurrentLinkedQueue<>(); - this.metricsManager = new FetchMetricsManager(metrics, metricsRegistry); - this.isolationLevel = isolationLevel; - this.sessionHandlers = new HashMap<>(); - this.nodesWithPendingFetchRequests = new HashSet<>(); - } - - /** - * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has - * visibility for testing. - * @return true if there are completed fetches, false otherwise - */ - protected boolean hasCompletedFetches() { - return !completedFetches.isEmpty(); - } - - /** - * Return whether we have any completed fetches that are fetchable. This method is thread-safe. - * @return true if there are completed fetches that can be returned, false otherwise - */ - public boolean hasAvailableFetches() { - return completedFetches.stream().anyMatch(fetch -> subscriptions.isFetchable(fetch.partition)); } /** @@ -172,606 +69,33 @@ public class Fetcher<K, V> implements Closeable { * @return number of fetches sent */ public synchronized int sendFetches() { - // Update metrics in case there was an assignment change - metricsManager.maybeUpdateAssignment(subscriptions); - Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests(); + for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); - final RequestFuture<ClientResponse> future = sendFetchRequestToNode(data, fetchTarget); - // We add the node to the set of nodes with pending fetch requests before adding the - // listener because the future may have been fulfilled on another thread (e.g. during a - // disconnection being handled by the heartbeat thread) which will mean the listener - // will be invoked synchronously. - this.nodesWithPendingFetchRequests.add(entry.getKey().id()); - future.addListener(new RequestFutureListener<ClientResponse>() { + final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); + RequestFutureListener<ClientResponse> listener = new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { synchronized (Fetcher.this) { - try { - FetchResponse response = (FetchResponse) resp.responseBody(); - FetchSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler == null) { - log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", - fetchTarget.id()); - return; - } - if (!handler.handleResponse(response, resp.requestHeader().apiVersion())) { - if (response.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) { - metadata.requestUpdate(); - } - return; - } - - Map<TopicPartition, FetchResponseData.PartitionData> responseData = response.responseData(handler.sessionTopicNames(), resp.requestHeader().apiVersion()); - Set<TopicPartition> partitions = new HashSet<>(responseData.keySet()); - FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions); - - for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) { - TopicPartition partition = entry.getKey(); - FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); - if (requestData == null) { - String message; - if (data.metadata().isFull()) { - message = MessageFormatter.arrayFormat( - "Response for missing full request partition: partition={}; metadata={}", - new Object[]{partition, data.metadata()}).getMessage(); - } else { - message = MessageFormatter.arrayFormat( - "Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", - new Object[]{partition, data.metadata(), data.toSend(), data.toForget(), data.toReplace()}).getMessage(); - } - - // Received fetch response for missing session partition - throw new IllegalStateException(message); - } else { - long fetchOffset = requestData.fetchOffset; - short requestVersion = resp.requestHeader().apiVersion(); - FetchResponseData.PartitionData partitionData = entry.getValue(); - - log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", - isolationLevel, fetchOffset, partition, partitionData); - - CompletedFetch<K, V> completedFetch = new CompletedFetch<>(logContext, - subscriptions, - checkCrcs, - decompressionBufferSupplier, - keyDeserializer, - valueDeserializer, - isolationLevel, - partition, - partitionData, - metricAggregator, - fetchOffset, - requestVersion); - completedFetches.add(completedFetch); - } - } - - metricsManager.recordLatency(resp.requestLatencyMs()); - } finally { - nodesWithPendingFetchRequests.remove(fetchTarget.id()); - } + handleFetchResponse(fetchTarget, data, resp); } } @Override public void onFailure(RuntimeException e) { synchronized (Fetcher.this) { - try { - FetchSessionHandler handler = sessionHandler(fetchTarget.id()); - if (handler != null) { - handler.handleError(e); - handler.sessionTopicPartitions().forEach(subscriptions::clearPreferredReadReplica); - } - } finally { - nodesWithPendingFetchRequests.remove(fetchTarget.id()); - } + handleFetchResponse(fetchTarget, e); } } - }); + }; + final RequestFuture<ClientResponse> future = client.send(fetchTarget, request); + future.addListener(listener); } - return fetchRequestMap.size(); - } - - /** - * Send Fetch Request to Kafka cluster asynchronously. - * - * </p> - * - * This method is visible for testing. - * - * @return A future that indicates result of sent Fetch request - */ - private RequestFuture<ClientResponse> sendFetchRequestToNode(final FetchSessionHandler.FetchRequestData requestData, - final Node fetchTarget) { - // Version 12 is the maximum version that could be used without topic IDs. See FetchRequest.json for schema - // changelog. - final short maxVersion = requestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12; - - final FetchRequest.Builder request = FetchRequest.Builder - .forConsumer(maxVersion, this.maxWaitMs, this.minBytes, requestData.toSend()) - .isolationLevel(isolationLevel) - .setMaxBytes(this.maxBytes) - .metadata(requestData.metadata()) - .removed(requestData.toForget()) - .replaced(requestData.toReplace()) - .rackId(clientRackId); - log.debug("Sending {} {} to broker {}", isolationLevel, requestData, fetchTarget); - - return client.send(fetchTarget, request); - } - - /** - * Return the fetched records, empty the record buffer and update the consumed position. - * - * </p> - * - * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated. - * - * @return A {@link Fetch} for the requested partitions - * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and - * the defaultResetPolicy is NONE - * @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse. - */ - public Fetch<K, V> collectFetch() { - Fetch<K, V> fetch = Fetch.empty(); - Queue<CompletedFetch<K, V>> pausedCompletedFetches = new ArrayDeque<>(); - int recordsRemaining = maxPollRecords; - - try { - while (recordsRemaining > 0) { - if (nextInLineFetch == null || nextInLineFetch.isConsumed) { - CompletedFetch<K, V> records = completedFetches.peek(); - if (records == null) break; - - if (!records.initialized) { - try { - nextInLineFetch = initializeCompletedFetch(records); - } catch (Exception e) { - // Remove a completedFetch upon a parse with exception if (1) it contains no records, and - // (2) there are no fetched records with actual content preceding this exception. - // The first condition ensures that the completedFetches is not stuck with the same completedFetch - // in cases such as the TopicAuthorizationException, and the second condition ensures that no - // potential data loss due to an exception in a following record. - FetchResponseData.PartitionData partition = records.partitionData; - if (fetch.isEmpty() && FetchResponse.recordsOrFail(partition).sizeInBytes() == 0) { - completedFetches.poll(); - } - throw e; - } - } else { - nextInLineFetch = records; - } - completedFetches.poll(); - } else if (subscriptions.isPaused(nextInLineFetch.partition)) { - // when the partition is paused we add the records back to the completedFetches queue instead of draining - // them so that they can be returned on a subsequent poll if the partition is resumed at that time - log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineFetch.partition); - pausedCompletedFetches.add(nextInLineFetch); - nextInLineFetch = null; - } else { - Fetch<K, V> nextFetch = fetchRecords(nextInLineFetch, recordsRemaining); - recordsRemaining -= nextFetch.numRecords(); - fetch.add(nextFetch); - } - } - } catch (KafkaException e) { - if (fetch.isEmpty()) - throw e; - } finally { - // add any polled completed fetches for paused partitions back to the completed fetches queue to be - // re-evaluated in the next poll - completedFetches.addAll(pausedCompletedFetches); - } - - return fetch; - } - - private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int maxRecords) { - if (!subscriptions.isAssigned(completedFetch.partition)) { - // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition {} since it is no longer assigned", - completedFetch.partition); - } else if (!subscriptions.isFetchable(completedFetch.partition)) { - // this can happen when a partition is paused before fetched records are returned to the consumer's - // poll call or if the offset is being reset - log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", - completedFetch.partition); - } else { - FetchPosition position = subscriptions.position(completedFetch.partition); - if (position == null) { - throw new IllegalStateException("Missing position for fetchable partition " + completedFetch.partition); - } - - if (completedFetch.nextFetchOffset == position.offset) { - List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords); - - log.trace("Returning {} fetched records at offset {} for assigned partition {}", - partRecords.size(), position, completedFetch.partition); - - boolean positionAdvanced = false; - - if (completedFetch.nextFetchOffset > position.offset) { - FetchPosition nextPosition = new FetchPosition( - completedFetch.nextFetchOffset, - completedFetch.lastEpoch, - position.currentLeader); - log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", - position, nextPosition, completedFetch.partition, partRecords.size()); - subscriptions.position(completedFetch.partition, nextPosition); - positionAdvanced = true; - } - - Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel); - if (partitionLag != null) - this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag); - - Long lead = subscriptions.partitionLead(completedFetch.partition); - if (lead != null) { - this.metricsManager.recordPartitionLead(completedFetch.partition, lead); - } - - return Fetch.forPartition(completedFetch.partition, partRecords, positionAdvanced); - } else { - // these records aren't next in line based on the last consumed position, ignore them - // they must be from an obsolete request - log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", - completedFetch.partition, completedFetch.nextFetchOffset, position); - } - } - - log.trace("Draining fetched records for partition {}", completedFetch.partition); - completedFetch.drain(); - - return Fetch.empty(); - } - - private List<TopicPartition> fetchablePartitions() { - Set<TopicPartition> exclude = new HashSet<>(); - if (nextInLineFetch != null && !nextInLineFetch.isConsumed) { - exclude.add(nextInLineFetch.partition); - } - for (CompletedFetch<K, V> completedFetch : completedFetches) { - exclude.add(completedFetch.partition); - } - return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); - } - - /** - * Determine which replica to read from. - */ - Node selectReadReplica(TopicPartition partition, Node leaderReplica, long currentTimeMs) { - Optional<Integer> nodeId = subscriptions.preferredReadReplica(partition, currentTimeMs); - if (nodeId.isPresent()) { - Optional<Node> node = nodeId.flatMap(id -> metadata.fetch().nodeIfOnline(partition, id)); - if (node.isPresent()) { - return node.get(); - } else { - log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," + - " using the leader instead.", nodeId, partition); - // Note that this condition may happen due to stale metadata, so we clear preferred replica and - // refresh metadata. - requestMetadataUpdate(partition); - return leaderReplica; - } - } else { - return leaderReplica; - } - } - - /** - * Create fetch requests for all nodes for which we have assigned partitions - * that have no existing requests in flight. - */ - private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { - Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>(); - long currentTimeMs = time.milliseconds(); - Map<String, Uuid> topicIds = metadata.topicIds(); - - for (TopicPartition partition : fetchablePartitions()) { - FetchPosition position = this.subscriptions.position(partition); - if (position == null) { - throw new IllegalStateException("Missing position for fetchable partition " + partition); - } - - Optional<Node> leaderOpt = position.currentLeader.leader; - if (!leaderOpt.isPresent()) { - log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); - metadata.requestUpdate(); - continue; - } - - // Use the preferred read replica if set, otherwise the partition's leader - Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs); - if (client.isUnavailable(node)) { - client.maybeThrowAuthFailure(node); - - // If we try to send during the reconnect backoff window, then the request is just - // going to be failed anyway before being sent, so skip the send for now - log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); - } else if (this.nodesWithPendingFetchRequests.contains(node.id())) { - log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); - } else { - // if there is a leader and no in-flight requests, issue a new fetch - FetchSessionHandler.Builder builder = fetchable.get(node); - if (builder == null) { - int id = node.id(); - FetchSessionHandler handler = sessionHandler(id); - if (handler == null) { - handler = new FetchSessionHandler(logContext, id); - sessionHandlers.put(id, handler); - } - builder = handler.newBuilder(); - fetchable.put(node, builder); - } - builder.add(partition, new FetchRequest.PartitionData( - topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID), - position.offset, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, - position.currentLeader.epoch, Optional.empty())); - - log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel, - partition, position, node); - } - } - - Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>(); - for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) { - reqs.put(entry.getKey(), entry.getValue().build()); - } - return reqs; - } - - /** - * Initialize a CompletedFetch object. - */ - private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> nextCompletedFetch) { - TopicPartition tp = nextCompletedFetch.partition; - FetchResponseData.PartitionData partition = nextCompletedFetch.partitionData; - long fetchOffset = nextCompletedFetch.nextFetchOffset; - CompletedFetch<K, V> completedFetch = null; - Errors error = Errors.forCode(partition.errorCode()); - - try { - if (!subscriptions.hasValidPosition(tp)) { - // this can happen when a rebalance happened while fetch is still in-flight - log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); - } else if (error == Errors.NONE) { - // we are interested in this fetch only if the beginning offset matches the - // current consumed position - FetchPosition position = subscriptions.position(tp); - if (position == null || position.offset != fetchOffset) { - log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + - "the expected offset {}", tp, fetchOffset, position); - return null; - } - - log.trace("Preparing to read {} bytes of data for partition {} with offset {}", - FetchResponse.recordsSize(partition), tp, position); - Iterator<? extends RecordBatch> batches = FetchResponse.recordsOrFail(partition).batches().iterator(); - completedFetch = nextCompletedFetch; - - if (!batches.hasNext() && FetchResponse.recordsSize(partition) > 0) { - if (completedFetch.requestVersion < 3) { - // Implement the pre KIP-74 behavior of throwing a RecordTooLargeException. - Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); - throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + - recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + - " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " + - "newer to avoid this issue. Alternately, increase the fetch size on the client (using " + - ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", - recordTooLargePartitions); - } else { - // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74) - throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + - fetchOffset + ". Received a non-empty fetch response from the server, but no " + - "complete records were found."); - } - } - - if (partition.highWatermark() >= 0) { - log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); - subscriptions.updateHighWatermark(tp, partition.highWatermark()); - } - - if (partition.logStartOffset() >= 0) { - log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset()); - subscriptions.updateLogStartOffset(tp, partition.logStartOffset()); - } - - if (partition.lastStableOffset() >= 0) { - log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset()); - subscriptions.updateLastStableOffset(tp, partition.lastStableOffset()); - } - - if (FetchResponse.isPreferredReplica(partition)) { - subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica(), () -> { - long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs(); - log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", - tp, partition.preferredReadReplica(), expireTimeMs); - return expireTimeMs; - }); - } - - nextCompletedFetch.initialized = true; - } else if (error == Errors.NOT_LEADER_OR_FOLLOWER || - error == Errors.REPLICA_NOT_AVAILABLE || - error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH || - error == Errors.OFFSET_NOT_AVAILABLE) { - log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); - requestMetadataUpdate(tp); - } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { - log.warn("Received unknown topic or partition error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.UNKNOWN_TOPIC_ID) { - log.warn("Received unknown topic ID error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.INCONSISTENT_TOPIC_ID) { - log.warn("Received inconsistent topic ID error in fetch for partition {}", tp); - requestMetadataUpdate(tp); - } else if (error == Errors.OFFSET_OUT_OF_RANGE) { - Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp); - if (!clearedReplicaId.isPresent()) { - // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally - FetchPosition position = subscriptions.position(tp); - if (position == null || fetchOffset != position.offset) { - log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + - "does not match the current offset {}", tp, fetchOffset, position); - } else { - handleOffsetOutOfRange(position, tp); - } - } else { - log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", - clearedReplicaId.get(), tp, error, fetchOffset); - } - } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { - //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters - log.warn("Not authorized to read from partition {}.", tp); - throw new TopicAuthorizationException(Collections.singleton(tp.topic())); - } else if (error == Errors.UNKNOWN_LEADER_EPOCH) { - log.debug("Received unknown leader epoch error in fetch for partition {}", tp); - } else if (error == Errors.UNKNOWN_SERVER_ERROR) { - log.warn("Unknown server error while fetching offset {} for topic-partition {}", - fetchOffset, tp); - } else if (error == Errors.CORRUPT_MESSAGE) { - throw new KafkaException("Encountered corrupt message when fetching offset " - + fetchOffset - + " for topic-partition " - + tp); - } else { - throw new IllegalStateException("Unexpected error code " - + error.code() - + " while fetching at offset " - + fetchOffset - + " from topic-partition " + tp); - } - } finally { - if (completedFetch == null) - nextCompletedFetch.recordAggregatedMetrics(0, 0); - - if (error != Errors.NONE) - // we move the partition to the end if there was an error. This way, it's more likely that partitions for - // the same topic can remain together (allowing for more efficient serialization). - subscriptions.movePartitionToEnd(tp); - } - - return completedFetch; - } - - private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) { - String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition; - if (subscriptions.hasDefaultOffsetResetPolicy()) { - log.info("{}, resetting offset", errorMessage); - subscriptions.requestOffsetReset(topicPartition); - } else { - log.info("{}, raising error to the application since no reset policy is configured", errorMessage); - throw new OffsetOutOfRangeException(errorMessage, - Collections.singletonMap(topicPartition, fetchPosition.offset)); - } - } - - /** - * Clear the buffered data which are not a part of newly assigned partitions - * - * @param assignedPartitions newly assigned {@link TopicPartition} - */ - public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) { - Iterator<CompletedFetch<K, V>> completedFetchesItr = completedFetches.iterator(); - while (completedFetchesItr.hasNext()) { - CompletedFetch<K, V> records = completedFetchesItr.next(); - TopicPartition tp = records.partition; - if (!assignedPartitions.contains(tp)) { - records.drain(); - completedFetchesItr.remove(); - } - } - - if (nextInLineFetch != null && !assignedPartitions.contains(nextInLineFetch.partition)) { - nextInLineFetch.drain(); - nextInLineFetch = null; - } - } - - /** - * Clear the buffered data which are not a part of newly assigned topics - * - * @param assignedTopics newly assigned topics - */ - public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) { - Set<TopicPartition> currentTopicPartitions = new HashSet<>(); - for (TopicPartition tp : subscriptions.assignedPartitions()) { - if (assignedTopics.contains(tp.topic())) { - currentTopicPartitions.add(tp); - } - } - clearBufferedDataForUnassignedPartitions(currentTopicPartitions); - } - - // Visible for testing - protected FetchSessionHandler sessionHandler(int node) { - return sessionHandlers.get(node); - } - - public static Sensor throttleTimeSensor(Metrics metrics, FetchMetricsRegistry metricsRegistry) { - Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); - fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg()); - - fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeMax), new Max()); - - return fetchThrottleTimeSensor; - } - - // Visible for testing - void maybeCloseFetchSessions(final Timer timer) { - final Cluster cluster = metadata.fetch(); - final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>(); - sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> { - // set the session handler to notify close. This will set the next metadata request to send close message. - sessionHandler.notifyClose(); - - final int sessionId = sessionHandler.sessionId(); - // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will - // skip sending the close request. - final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); - if (fetchTarget == null || client.isUnavailable(fetchTarget)) { - log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget); - return; - } - - final RequestFuture<ClientResponse> responseFuture = sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget); - responseFuture.addListener(new RequestFutureListener<ClientResponse>() { - @Override - public void onSuccess(ClientResponse value) { - log.debug("Successfully sent a close message for fetch session: {} to node: {}", sessionId, fetchTarget); - } - - @Override - public void onFailure(RuntimeException e) { - log.debug("Unable to a close message for fetch session: {} to node: {}. " + - "This may result in unnecessary fetch sessions at the broker.", sessionId, fetchTarget, e); - } - }); - - requestFutures.add(responseFuture); - }); - - // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until - // all requests have received a response. - while (timer.notExpired() && !requestFutures.stream().allMatch(RequestFuture::isDone)) { - client.poll(timer, null, true); - } - - if (!requestFutures.stream().allMatch(RequestFuture::isDone)) { - // we ran out of time before completing all futures. It is ok since we don't want to block the shutdown - // here. - log.debug("All requests couldn't be sent in the specific timeout period {}ms. " + - "This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + - "KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); - } + return fetchRequestMap.size(); } public void close(final Timer timer) { @@ -782,25 +106,8 @@ public class Fetcher<K, V> implements Closeable { // Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence, // it is necessary to acquire a lock on the fetcher instance before modifying the states. - synchronized (Fetcher.this) { - // we do not need to re-enable wakeups since we are closing already - client.disableWakeups(); - if (nextInLineFetch != null) - nextInLineFetch.drain(); - maybeCloseFetchSessions(timer); - Utils.closeQuietly(decompressionBufferSupplier, "decompressionBufferSupplier"); - sessionHandlers.clear(); + synchronized (this) { + super.close(timer); } } - - @Override - public void close() { - close(time.timer(0)); - } - - private void requestMetadataUpdate(TopicPartition topicPartition) { - this.metadata.requestUpdate(); - this.subscriptions.clearPreferredReadReplica(topicPartition); - } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 98e6a85444d..b55c6822b0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.MockClient; @@ -28,6 +29,8 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.clients.consumer.internals.FetchConfig; +import org.apache.kafka.clients.consumer.internals.FetchMetricsManager; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.consumer.internals.OffsetFetcher; @@ -2656,24 +2659,26 @@ public class KafkaConsumerTest { null); } IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED; - Fetcher<String, String> fetcher = new Fetcher<>( - loggerFactory, - consumerClient, + FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics); + FetchConfig<String, String> fetchConfig = new FetchConfig<>( minBytes, maxBytes, maxWaitMs, fetchSize, maxPollRecords, checkCrcs, - "", + CommonClientConfigs.DEFAULT_CLIENT_RACK, keyDeserializer, deserializer, + isolationLevel); + Fetcher<String, String> fetcher = new Fetcher<>( + loggerFactory, + consumerClient, metadata, subscription, - metrics, - metricsRegistry.fetcherMetrics, - time, - isolationLevel); + fetchConfig, + metricsManager, + time); OffsetFetcher offsetFetcher = new OffsetFetcher(loggerFactory, consumerClient, metadata, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java index b420852852a..895d40b85fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.IsolationLevel; @@ -41,8 +42,6 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.nio.ByteBuffer; @@ -60,19 +59,6 @@ public class CompletedFetchTest { private final static long PRODUCER_ID = 1000L; private final static short PRODUCER_EPOCH = 0; - private BufferSupplier bufferSupplier; - - @BeforeEach - public void setup() { - bufferSupplier = BufferSupplier.create(); - } - - @AfterEach - public void tearDown() { - if (bufferSupplier != null) - bufferSupplier.close(); - } - @Test public void testSimple() { long fetchOffset = 5; @@ -235,13 +221,23 @@ public class CompletedFetchTest { FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), metricsRegistry); FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metrics, Collections.singleton(TP)); - return new CompletedFetch<>(logContext, - subscriptions, + FetchConfig<K, V> fetchConfig = new FetchConfig<>( + ConsumerConfig.DEFAULT_FETCH_MIN_BYTES, + ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, + ConsumerConfig.DEFAULT_FETCH_MAX_WAIT_MS, + ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES, + ConsumerConfig.DEFAULT_MAX_POLL_RECORDS, checkCrcs, - bufferSupplier, + ConsumerConfig.DEFAULT_CLIENT_RACK, keyDeserializer, valueDeserializer, - isolationLevel, + isolationLevel + ); + return new CompletedFetch<>( + logContext, + subscriptions, + fetchConfig, + BufferSupplier.create(), TP, partitionData, metricAggregator, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index e60edbfb6c1..e4ddce247db 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.FetchSessionHandler; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; @@ -50,7 +51,6 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetFo import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -176,6 +176,7 @@ public class FetcherTest { private SubscriptionState subscriptions; private ConsumerMetadata metadata; private FetchMetricsRegistry metricsRegistry; + private FetchMetricsManager metricsManager; private MockClient client; private Metrics metrics; private ApiVersions apiVersions = new ApiVersions(); @@ -1902,12 +1903,11 @@ public class FetcherTest { buildFetcher(); MockSelector selector = new MockSelector(time); - Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry); Cluster cluster = TestUtils.singletonCluster("test", 1); Node node = cluster.nodes().get(0); NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000, - time, true, new ApiVersions(), throttleTimeSensor, new LogContext()); + time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext()); ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse( 400, ApiMessageType.ListenerType.ZK_BROKER); @@ -2262,9 +2262,6 @@ public class FetcherTest { Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); assertTrue(partitionRecords.containsKey(tp0)); - // Create throttle metrics - Fetcher.throttleTimeSensor(metrics, metricsRegistry); - // Verify that all metrics except metrics-count have registered templates Set<MetricNameTemplate> allMetrics = new HashSet<>(); for (MetricName n : metrics.metrics().keySet()) { @@ -2843,31 +2840,32 @@ public class FetcherTest { isolationLevel, apiVersions); - fetcher = new Fetcher<byte[], byte[]>( - new LogContext(), - consumerClient, + FetchConfig<byte[], byte[]> fetchConfig = new FetchConfig<>( minBytes, maxBytes, maxWaitMs, fetchSize, 2 * numPartitions, - true, - "", + true, // check crcs + CommonClientConfigs.DEFAULT_CLIENT_RACK, new ByteArrayDeserializer(), new ByteArrayDeserializer(), + isolationLevel); + fetcher = new Fetcher<byte[], byte[]>( + logContext, + consumerClient, metadata, subscriptions, - metrics, - metricsRegistry, - time, - isolationLevel) { + fetchConfig, + metricsManager, + time) { @Override - protected FetchSessionHandler sessionHandler(int id) { - final FetchSessionHandler handler = super.sessionHandler(id); + protected FetchSessionHandler sessionHandler(int node) { + final FetchSessionHandler handler = super.sessionHandler(node); if (handler == null) return null; else { - return new FetchSessionHandler(new LogContext(), id) { + return new FetchSessionHandler(new LogContext(), node) { @Override public Builder newBuilder() { verifySessionPartitions(); @@ -3645,24 +3643,25 @@ public class FetcherTest { SubscriptionState subscriptionState, LogContext logContext) { buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext); - fetcher = spy(new Fetcher<>( - new LogContext(), - consumerClient, + FetchConfig<K, V> fetchConfig = new FetchConfig<>( minBytes, maxBytes, maxWaitMs, fetchSize, maxPollRecords, true, // check crc - "", + CommonClientConfigs.DEFAULT_CLIENT_RACK, keyDeserializer, valueDeserializer, + isolationLevel); + fetcher = spy(new Fetcher<>( + logContext, + consumerClient, metadata, - subscriptions, - metrics, - metricsRegistry, - time, - isolationLevel)); + subscriptionState, + fetchConfig, + metricsManager, + time)); offsetFetcher = new OffsetFetcher(logContext, consumerClient, metadata, @@ -3687,6 +3686,7 @@ public class FetcherTest { consumerClient = spy(new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, Integer.MAX_VALUE)); metricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId); + metricsManager = new FetchMetricsManager(metrics, metricsRegistry); } private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index ffaeab4709d..24aa9c8ad1e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; @@ -1245,24 +1246,25 @@ public class OffsetFetcherTest { buildFetcher(metricConfig, isolationLevel, metadataExpireMs, subscriptionState, logContext); FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group"); - Fetcher<byte[], byte[]> fetcher = new Fetcher<>( - logContext, - consumerClient, + FetchConfig<byte[], byte[]> fetchConfig = new FetchConfig<>( minBytes, maxBytes, maxWaitMs, fetchSize, maxPollRecords, true, // check crc - "", + CommonClientConfigs.DEFAULT_CLIENT_RACK, new ByteArrayDeserializer(), new ByteArrayDeserializer(), + isolationLevel); + Fetcher<byte[], byte[]> fetcher = new Fetcher<>( + logContext, + consumerClient, metadata, subscriptions, - metrics, - metricsRegistry, - time, - isolationLevel); + fetchConfig, + new FetchMetricsManager(metrics, metricsRegistry), + time); assignFromUser(singleton(tp0));