[jira] [Comment Edited] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769425#comment-17769425 ] Arpit Goyal edited comment on KAFKA-15169 at 9/27/23 5:37 AM: -- [~divijvaidya] As per the code RemoteIndexCache never retries if file gets corrupted after remote storage fetch. I will create a separate ticket to track this enhancement . For the 1st test case I am thinking of writing a test case where it should throw corrupt exception if files get corrupted during remote fetch. And adding Other test cases of what we discussed i will cover it is as part of this JIRA. WDYT ? was (Author: JIRAUSER301926): [~divijvaidya] As per the code RemoteIndexCache never retries if file gets corrupted after remote storage fetch. I will create a separate ticket to track this enhancement and which indirectly cover1st test case we discussed. Other test cases of what we discussed i will cover it is as part of this JIRA. WDYT ? > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769425#comment-17769425 ] Arpit Goyal commented on KAFKA-15169: - [~divijvaidya] As per the code RemoteIndexCache never retries if file gets corrupted after remote storage fetch. I will create a separate ticket to track this enhancement and which indirectly cover1st test case we discussed. Other test cases of what we discussed i will cover it is as part of this JIRA. WDYT ? > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15510) Follower's lastFetchedEpoch wrongly set when fetch response has no record
Chern Yih Cheah created KAFKA-15510: --- Summary: Follower's lastFetchedEpoch wrongly set when fetch response has no record Key: KAFKA-15510 URL: https://issues.apache.org/jira/browse/KAFKA-15510 Project: Kafka Issue Type: Bug Reporter: Chern Yih Cheah Assignee: Chern Yih Cheah A regression is introduced by [https://github.com/apache/kafka/pull/13843/files#diff-508e9dc4d52744119dda36d69ce63a1901abfd3080ca72fc4554250b7e9f5242.|https://github.com/apache/kafka/pull/13843/files#diff-508e9dc4d52744119dda36d69ce63a1901abfd3080ca72fc4554250b7e9f5242] When the fetch response has no record for a partition, validBytes is 0. In this case, we shouldn't set the last fetch epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead. An effect of this is truncation of fetch might not work correctly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21
[ https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769418#comment-17769418 ] Ismael Juma commented on KAFKA-15493: - And you used Java 21 for the `ducker-ak up` step? > Ensure system tests work with Java 21 > - > > Key: KAFKA-15493 > URL: https://issues.apache.org/jira/browse/KAFKA-15493 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Said BOUDJELDA >Priority: Major > Fix For: 3.7.0 > > > Run the system tests as described below with Java 21: > [https://github.com/apache/kafka/tree/trunk/tests] > One relevant portion: > Run tests with a different JVM (it may be as easy as replacing 11 with 21) > {code:java} > bash tests/docker/ducker-ak up -j 'openjdk:11'; > tests/docker/run_tests.sh{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337882369 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult res) { } public boolean isRunning() { -return this.running; +return running; } public void wakeup() { -networkClientDelegate.wakeup(); +if (networkClientDelegate != null) Review Comment: Added a brief comment to explain this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337881557 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -217,29 +98,33 @@ public void run() { } } catch (final Throwable t) { log.error("The background thread failed due to unexpected error", t); -throw new RuntimeException(t); +throw new KafkaException(t); } finally { close(); -log.debug("{} closed", getClass()); +log.debug("Background thread closed"); } } +void initializeResources() { +applicationEventProcessor = applicationEventProcessorSupplier.get(); +networkClientDelegate = networkClientDelegateSupplier.get(); +requestManagers = requestManagersSupplier.get(); +} + /** * Poll and process an {@link ApplicationEvent}. It performs the following tasks: * 1. Drains and try to process all the requests in the queue. * 2. Iterate through the registry, poll, and get the next poll time for the network poll * 3. Poll the networkClient to send and retrieve the response. */ void runOnce() { -if (!applicationEventQueue.isEmpty()) { -LinkedList res = new LinkedList<>(); -this.applicationEventQueue.drainTo(res); +LinkedList events = new LinkedList<>(); +applicationEventQueue.drainTo(events); -for (ApplicationEvent event : res) { -log.debug("Consuming application event: {}", event); -Objects.requireNonNull(event); -applicationEventProcessor.process(event); -} +for (ApplicationEvent event : events) { +log.trace("Dequeued event: {}", event); +Objects.requireNonNull(event); Review Comment: This code has been refactored, but it still has the check, just to be paranoid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337881130 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java: ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.LinkedList; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; + +public class BackgroundEventProcessor { + +private final Logger log; +private final BlockingQueue backgroundEventQueue; + +public BackgroundEventProcessor(final LogContext logContext, +final BlockingQueue backgroundEventQueue) { +this.log = logContext.logger(BackgroundEventProcessor.class); +this.backgroundEventQueue = backgroundEventQueue; +} + +/** + * Drains all available {@link BackgroundEvent}s, and then processes them in order. If any + * errors are thrown as a result of a {@link ErrorBackgroundEvent} or an error occurs while processing + * another type of {@link BackgroundEvent}, only the first exception will be thrown, all + * subsequent errors will simply be logged at WARN level. + * + * @throws RuntimeException or subclass + */ +public void process() { +LinkedList events = new LinkedList<>(); +backgroundEventQueue.drainTo(events); + +RuntimeException first = null; +int errorCount = 0; + +for (BackgroundEvent event : events) { +log.debug("Consuming background event: {}", event); + +try { +process(event); +} catch (RuntimeException e) { +errorCount++; + +if (first == null) { +first = e; +log.warn("Error #{} from background thread (will be logged and thrown): {}", errorCount, e.getMessage(), e); Review Comment: In the refactored version, the logging is not as verbose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337849711 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + +private final Logger log; +private final ErrorEventHandler errorEventHandler; +private final NetworkClientDelegate networkClientDelegate; + +FetchRequestManager(final LogContext logContext, +final Time time, +final ErrorEventHandler errorEventHandler, +final ConsumerMetadata metadata, +final SubscriptionState subscriptions, +final FetchConfig fetchConfig, +final FetchMetricsManager metricsManager, +final NetworkClientDelegate networkClientDelegate) { +super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); +this.log = logContext.logger(FetchRequestManager.class); +this.errorEventHandler = errorEventHandler; +this.networkClientDelegate = networkClientDelegate; +} + +@Override +protected boolean isUnavailable(Node node) { +return networkClientDelegate.isUnavailable(node); +} + +@Override +protected void maybeThrowAuthFailure(Node node) { +networkClientDelegate.maybeThrowAuthFailure(node); +} + +@Override +public PollResult poll(long currentTimeMs) { +List requests; + +if (!idempotentCloser.isClosed()) { +// If the fetcher is open (i.e. not closed), we will issue the normal fetch requests +requests = prepareFetchRequests().entrySet().stream().map(entry -> { +final Node fetchTarget = entry.getKey(); +final FetchSessionHandler.FetchRequestData data = entry.getValue(); +final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); +final BiConsumer responseHandler = (clientResponse, t) -> { +if (t != null) { +handleFetchResponse(fetchTarget, t); +log.warn("Attempt to fetch data from node {} failed due to fatal exception", fetchTarget, t); Review Comment: It looks like the other call sites that add error events to the handler also log them 路♂️ ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + *
[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
showuon commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736393174 Thanks all. (Sorry, pressing the wrong button on phone). Will verify the decompressed results are able to be correctly read today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
showuon closed pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4 URL: https://github.com/apache/kafka/pull/14434 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
showuon commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736386116 One question I'd like to get your thoughts. This test is doing: 1. Generate a 2 byte key, and 128 byte value. 2. Create a record using snappy codec compressed. 3. Get the size of the record, which is 197. 4. Add hardcoded 5 for decompression, so, set the max message size to 202. 5. After decompressed, the record size is 203, throw exception, while before bumping snappy version, it'll always be 202. Here's my question: If the decompressed recodes can be read correctly in the consumer side(I haven't tested it, yet), then should we still worry about the additional 1byte after decompression? @divijvaidya @ijuma @jlprat , thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21
[ https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769355#comment-17769355 ] Said BOUDJELDA commented on KAFKA-15493: bash tests/docker/run_tests.sh passes with success > Ensure system tests work with Java 21 > - > > Key: KAFKA-15493 > URL: https://issues.apache.org/jira/browse/KAFKA-15493 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Said BOUDJELDA >Priority: Major > Fix For: 3.7.0 > > > Run the system tests as described below with Java 21: > [https://github.com/apache/kafka/tree/trunk/tests] > One relevant portion: > Run tests with a different JVM (it may be as easy as replacing 11 with 21) > {code:java} > bash tests/docker/ducker-ak up -j 'openjdk:11'; > tests/docker/run_tests.sh{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe merged pull request #14399: KAFKA-15466: Add KIP-919 support for some admin APIs
cmccabe merged PR #14399: URL: https://github.com/apache/kafka/pull/14399 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21
[ https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769348#comment-17769348 ] Said BOUDJELDA commented on KAFKA-15493: ./gradlew clean systemTestLibs is passing with success > Ensure system tests work with Java 21 > - > > Key: KAFKA-15493 > URL: https://issues.apache.org/jira/browse/KAFKA-15493 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Said BOUDJELDA >Priority: Major > Fix For: 3.7.0 > > > Run the system tests as described below with Java 21: > [https://github.com/apache/kafka/tree/trunk/tests] > One relevant portion: > Run tests with a different JVM (it may be as easy as replacing 11 with 21) > {code:java} > bash tests/docker/ducker-ak up -j 'openjdk:11'; > tests/docker/run_tests.sh{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
divijvaidya commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736256796 I have the run the test ~200 times locally using the command below and they all succeeded. Also, the CI build on jenkins has succeeded twice [1] and [2]. IMO, we can safely merge this one. ``` I=0; while ./gradlew :core:test --tests kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0 --rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done ``` [1] https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14434/3/pipeline/ [2] https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14434/4/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #14449: MINOR: Upgrade version of zstd-jni to the latest stable version 1.5.5-5
bmscomp commented on PR #14449: URL: https://github.com/apache/kafka/pull/14449#issuecomment-1736244500 @divijvaidya Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #14281: KAFKA-15326: [9/N] Start and stop executors and cornercases
lucasbru commented on PR #14281: URL: https://github.com/apache/kafka/pull/14281#issuecomment-1736221407 Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #14456: KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java
nizhikov opened a new pull request, #14456: URL: https://github.com/apache/kafka/pull/14456 This PR is part of #13247 It contains `ReassignPartitionsIntegrationTest` rewritten in java. Goal of PR is reduce changes size in main PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337661714 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -636,42 +857,148 @@ public void assign(Collection partitions) { } @Override -public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { -throw new KafkaException("method not implemented"); +public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.toString().equals("")) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +"null" : "empty")); + +throwIfNoAssignorsConfigured(); +log.info("Subscribed to pattern: '{}'", pattern); +this.subscriptions.subscribe(pattern, listener); +this.updatePatternSubscription(metadata.fetch()); +this.metadata.requestUpdateForNewTopics(); +} + +/** + * TODO: remove this when we implement the KIP-848 protocol. + * + * + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ +private void updatePatternSubscription(Cluster cluster) { +final Set topicsToSubscribe = cluster.topics().stream() +.filter(subscriptions::matchesSubscribedPattern) +.collect(Collectors.toSet()); +if (subscriptions.subscribeFromPattern(topicsToSubscribe)) +metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { -throw new KafkaException("method not implemented"); +subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +fetchBuffer.retainAll(Collections.emptySet()); +this.subscriptions.unsubscribe(); } @Override @Deprecated -public ConsumerRecords poll(long timeout) { -throw new KafkaException("method not implemented"); +public ConsumerRecords poll(final long timeoutMs) { +return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } -private static ClusterResourceListeners configureClusterResourceListeners( -final Deserializer keyDeserializer, -final Deserializer valueDeserializer, -final List... candidateLists) { -ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); -for (List candidateList: candidateLists) -clusterResourceListeners.maybeAddAll(candidateList); +private void sendFetches() { +FetchEvent event = new FetchEvent(); +eventHandler.add(event); + +event.future().whenComplete((completedFetches, error) -> { +if (completedFetches != null && !completedFetches.isEmpty()) { +fetchBuffer.addAll(completedFetches); Review Comment: This has been reworked as part of a recent change. Not only are there are still two `FetchBuffer`s, but I've _added_ a blocking queue between them since the `FetchBuffer` should not be updated on the `Future` callback, since that is updated on another thread and `FetchBuffer` is not thread safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337660657 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -636,42 +857,148 @@ public void assign(Collection partitions) { } @Override -public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { -throw new KafkaException("method not implemented"); +public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.toString().equals("")) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +"null" : "empty")); + +throwIfNoAssignorsConfigured(); +log.info("Subscribed to pattern: '{}'", pattern); +this.subscriptions.subscribe(pattern, listener); +this.updatePatternSubscription(metadata.fetch()); +this.metadata.requestUpdateForNewTopics(); +} + +/** + * TODO: remove this when we implement the KIP-848 protocol. + * + * + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ +private void updatePatternSubscription(Cluster cluster) { +final Set topicsToSubscribe = cluster.topics().stream() +.filter(subscriptions::matchesSubscribedPattern) +.collect(Collectors.toSet()); +if (subscriptions.subscribeFromPattern(topicsToSubscribe)) +metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { -throw new KafkaException("method not implemented"); +subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +fetchBuffer.retainAll(Collections.emptySet()); +this.subscriptions.unsubscribe(); } @Override @Deprecated -public ConsumerRecords poll(long timeout) { -throw new KafkaException("method not implemented"); +public ConsumerRecords poll(final long timeoutMs) { +return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } -private static ClusterResourceListeners configureClusterResourceListeners( -final Deserializer keyDeserializer, -final Deserializer valueDeserializer, -final List... candidateLists) { -ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); -for (List candidateList: candidateLists) -clusterResourceListeners.maybeAddAll(candidateList); +private void sendFetches() { +FetchEvent event = new FetchEvent(); +eventHandler.add(event); + +event.future().whenComplete((completedFetches, error) -> { +if (completedFetches != null && !completedFetches.isEmpty()) { +fetchBuffer.addAll(completedFetches); +} +}); +} + +/** + * @throws KafkaException if the rebalance callback throws exception + */ +private Fetch pollForFetches(Timer timer) { +long pollTimeout = timer.remainingMs(); + +// if data is available already, return it immediately +final Fetch fetch = fetchCollector.collectFetch(fetchBuffer); +if (!fetch.isEmpty()) { +return fetch; +} + +// send any new fetches (won't resend pending fetches) +sendFetches(); + +// We do not want to be stuck blocking in poll if we are missing some positions +// since the offset lookup may be backing off after a failure + +// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call +// updateAssignmentMetadataIfNeeded before this method. +if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { +pollTimeout = retryBackoffMs; +} + +log.trace("Polling for fetches with timeout {}", pollTimeout); + +Timer pollTimer = time.timer(pollTimeout); + +// Attempt to fetch any data. It's OK if we time out here; it's a best case effort. The +// data may not be immediately available, but the calling method (poll) will correctly +// handle the overall timeout. +try { +Queue completedFetches = eventHandler.addAndGet(new FetchEvent(), pollTimer); Review Comment: I've changed the mechanism to handle the responses from fetch
[GitHub] [kafka] ahuang98 commented on a diff in pull request #14428: KAFKA-15489: resign leadership when no fetch from majority voters
ahuang98 commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1337653712 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -76,9 +85,37 @@ protected LeaderState( boolean hasAcknowledgedLeader = voterId == localId; this.voterStates.put(voterId, new ReplicaState(voterId, hasAcknowledgedLeader)); } +this.majority = voters.size() / 2; this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) { Review Comment: nit: these are pretty lengthy and arguably unintuitive method names, could we add a short comment on what each method does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337639317 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, left the group, or encountering fatal exceptions, the heartbeat will + * not be sent. If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the partition revocation process, a heartbeat request will be sent in the next event loop. + * + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final HeartbeatRequestState heartbeatRequestState, +final ErrorEventHandler nonRetriableErrorHandler) { +this.logger = logContext.logger(this.getClass()); +
[GitHub] [kafka] zhengyd2014 commented on a diff in pull request #14444: KIP-951: Server side and protocol changes for KIP-951
zhengyd2014 commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1337630516 ## clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java: ## @@ -276,11 +289,16 @@ private static FetchResponseData toMessage(Errors error, .setPartitions(partitionResponses)); } } - -return new FetchResponseData() -.setThrottleTimeMs(throttleTimeMs) -.setErrorCode(error.code()) -.setSessionId(sessionId) -.setResponses(topicResponseList); +data.setThrottleTimeMs(throttleTimeMs) +.setErrorCode(error.code()) +.setSessionId(sessionId) +.setResponses(topicResponseList); +nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add( Review Comment: it makes sense, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.
nizhikov commented on PR #14064: URL: https://github.com/apache/kafka/pull/14064#issuecomment-1736054928 In case someone interested, adding `:connect:api` dependency to `tools` somehow make `:storage:api` classes invisible. I fix it adding explicit dependency on `:storage:api` project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15509) KRaft Controller doesn't consider brokers that are shutting down during topic creating
José Armando García Sancio created KAFKA-15509: -- Summary: KRaft Controller doesn't consider brokers that are shutting down during topic creating Key: KAFKA-15509 URL: https://issues.apache.org/jira/browse/KAFKA-15509 Project: Kafka Issue Type: Bug Reporter: José Armando García Sancio Assignee: José Armando García Sancio The topic creation implementation in KRaft is able to user brokers that are fenced as replicas during topic creations. The restriction is that the controller won't assign leadership to those topics. This feature makes it possible for users to roll clusters with 3 brokers without sacrificing topic creation availability. Looking at the code it looks like the KRaft controller doesn't consider brokers that are shutting down as eligible for replicas during topic creation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru merged pull request #14226: KAFKA-15326: [8/N] Move consumer interaction out of processing methods
lucasbru merged PR #14226: URL: https://github.com/apache/kafka/pull/14226 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #14226: KAFKA-15326: [8/N] Move consumer interaction out of processing methods
lucasbru commented on PR #14226: URL: https://github.com/apache/kafka/pull/14226#issuecomment-1735852839 Failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444877 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } +private Timer createTimerForRequest(final Duration timeout) { +// this.time could be null if an exception occurs in constructor prior to setting the this.time field Review Comment: `createTimerForRequest()` is no longer used, so marking this as resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r133756 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } +private Timer createTimerForRequest(final Duration timeout) { +// this.time could be null if an exception occurs in constructor prior to setting the this.time field +final Time localTime = (time == null) ? Time.SYSTEM : time; +return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); +} + @Override public void close(Duration timeout) { +if (timeout.toMillis() < 0) +throw new IllegalArgumentException("The timeout cannot be negative."); + +try { +if (!closed) { +// need to close before setting the flag since the close function +// itself may trigger rebalance callback that needs the consumer to be open still +close(timeout, false); +} +} finally { +closed = true; +} +} + +private void close(Duration timeout, boolean swallowException) { +log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); + +final Timer closeTimer = createTimerForRequest(timeout); +if (fetchBuffer != null) { +// the timeout for the session close is at-most the requestTimeoutMs +long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); +if (remainingDurationInTimeout > 0) { +remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); Review Comment: Removed the unnecessary timer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444033 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -526,13 +677,57 @@ public void close() { close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } +private Timer createTimerForRequest(final Duration timeout) { +// this.time could be null if an exception occurs in constructor prior to setting the this.time field +final Time localTime = (time == null) ? Time.SYSTEM : time; +return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs)); +} + @Override public void close(Duration timeout) { +if (timeout.toMillis() < 0) +throw new IllegalArgumentException("The timeout cannot be negative."); + +try { +if (!closed) { +// need to close before setting the flag since the close function +// itself may trigger rebalance callback that needs the consumer to be open still +close(timeout, false); +} +} finally { +closed = true; +} +} + +private void close(Duration timeout, boolean swallowException) { +log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); + +final Timer closeTimer = createTimerForRequest(timeout); +if (fetchBuffer != null) { +// the timeout for the session close is at-most the requestTimeoutMs +long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - closeTimer.elapsedMs()); +if (remainingDurationInTimeout > 0) { +remainingDurationInTimeout = Math.min(requestTimeoutMs, remainingDurationInTimeout); +} + +closeTimer.reset(remainingDurationInTimeout); + +// This is a blocking call bound by the time remaining in closeTimer Review Comment: I've updated this so that the `close()` method is a lot cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1337443244 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() { } /** - * Refresh the committed offsets for partitions that require initialization. + * Refresh the committed offsets for provided partitions. Review Comment: I reverted the change to the comment. I don't remember changing it 路♂️ ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() { } /** - * Refresh the committed offsets for partitions that require initialization. + * Refresh the committed offsets for provided partitions. Review Comment: I reverted the change to the comment. I don't remember changing it 路♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337428627 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: Ok, that's fine but not enough. I think here we also need the tags. If you look at the java doc it shows as a giant block, which I expect it is not what we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337423659 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; Review Comment: we don't make these var final in tests because we could change them later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337420489 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
philipnee commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337418980 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: This is just for comment formatting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on PR #14364: URL: https://github.com/apache/kafka/pull/14364#issuecomment-1735694640 Thanks for the changes @philipnee, left a few other minor comments and questions. LGTM. As I see it, the main areas requiring follow-up in other PRs would be: - fully integrate with the state defined in the membershipManager (getting rid of all the parallel `groupState` defined here) - integrate with the assignment processing component, driving the logic to delegate callback execution and send HB on completion as required. - extend HB manager test to cover successful path and timeout scenarios. @dajac it would be helpful if you can take another look at it now, as it has evolved quite a bit. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
divijvaidya commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735667819 Since @showuon is asleep, I took the liberty of adding an extra byte to the assumed size of compressed message produced by snappy. Let's see if the test passes now. Will run it multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337309504 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -58,7 +59,7 @@ public class HeartbeatRequestManagerTest { Review Comment: High level comment, I do see this test covering the timing logic for sending, and the response handling on error, but nothing for the successful HB response handling (important to ensure that it is updating the target assignment so that it can be processed by other components). Also it would be helpful to have some tests around HB timeouts, mainly to validate the retry logic around that. (Just suggestions for better coverage of core actions, OK for me if we prefer to target that in a separate PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337295428 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -173,24 +194,22 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( mockLogContext, mockTime, -heartbeatInterval, +heartbeatIntervalMs, retryBackoffMs, retryBackoffMaxMs, 0); -when(mockMembershipManager.state()).thenReturn(STABLE); heartbeatRequestManager = createManager(); -// Sending first heartbeat to set the state to STABLE +// Sending first heartbeat w/o assignment to set the state to STABLE ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() -.setHeartbeatIntervalMs(heartbeatInterval) +.setHeartbeatIntervalMs(heartbeatIntervalMs) .setMemberId(memberId) -.setMemberEpoch(memberEpoch) -.setAssignment(memberAssignment)); Review Comment: seems we're not using `memberAssignment` in the test anymore? let's remove if unused -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
divijvaidya commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735632838 At https://github.com/apache/kafka/blob/65efb981347d6f81fb2713cd27cdfdfa9d8781b9/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala#L149 we assume that after compression, the snappy will add 5 additional bytes. But that is an assumption which is dependent on Snappy's internal implementation. For the purpose of this test, we can set a reasonable value like `largeMessageSet.sizeInBytes + 50` (to be future proof) and it should pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282860 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; +private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = mockAssignment(); +private ErrorEventHandler errorEventHandler; + +private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() { +return new ConsumerGroupHeartbeatResponseData.Assignment() +.setAssignedTopicPartitions(Arrays.asList( +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(0, 1, 2)), +new ConsumerGroupHeartbeatResponseData.TopicPartitions() +.setTopicId(Uuid.randomUuid()) +.setPartitions(Arrays.asList(3, 4, 5)) +)); +} + +@BeforeEach +public void setUp() { +mockTime = new MockTime(); +mockLogContext = new LogContext(); +Properties properties = new Properties(); +properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +properties.put(KEY_DESERIALIZER_CLASS_CONFIG,
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED; +import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class HeartbeatRequestManagerTest { + +private final int heartbeatIntervalMs = 1000; +private final long retryBackoffMaxMs = 3000; +private final long retryBackoffMs = 100; +private final String groupId = "group-id"; + +private Time mockTime; +private LogContext mockLogContext; +private CoordinatorRequestManager mockCoordinatorRequestManager; +private SubscriptionState mockSubscriptionState; +private HeartbeatRequestManager heartbeatRequestManager; +private MembershipManager mockMembershipManager; +private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; +private ConsumerConfig config; + +private String memberId = "member-id"; +private int memberEpoch = 1; Review Comment: final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge opened a new pull request, #14455: KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing
tinaselenge opened a new pull request, #14455: URL: https://github.com/apache/kafka/pull/14455 AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337278130 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -59,4 +59,20 @@ public interface MembershipManager { * current assignment. */ void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + +/** + * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a Review Comment: Extra space after state. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -59,4 +59,20 @@ public interface MembershipManager { * current assignment. */ void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment); + +/** + * Transition the member to the FENCED state. This is only invoked when the heartbeat returns a + * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code. + */ +void fenceMember(); + +/** + * Transition the member to the FAILED state. This is invoked when the heartbeat returns a non-retriable error. Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
ijuma commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735623002 We don't know, but it's the nature of compression libraries - there can be some variations as the algorithms are tweaked for performance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
jlprat commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735612270 Is it just 1 extra byte? Or are there cases where some more extra bytes are added? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337269007 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * Review Comment: These empty lines won't show as such in the java doc so let's add tags to ensure we have the separation we want -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
ijuma commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735607326 The extra byte may be ok since the underlying snappy library was also upgraded in this new version https://github.com/xerial/snappy-java/commit/f2e97f27be0dc6c691369040ba8a673bface484c Our test looks to rely on implementation details of the compression library. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337265790 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337263803 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
jlprat commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735603472 I think the problem is in the details, why this extra byte? Also, 1 extra byte might not be relevant, but at some point the overhead of extra bytes might cause performance regressions (overhead on network traffic + increased memory footprint). I did a bit of debugging on [`UnifiedLog.scala` line 826](https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/log/UnifiedLog.scala#L826) and found out that all the `dups` are in theory written with the same Snappy codec but they seem to have the same size with the previous snappy version and the newer one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
divijvaidya commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735602069 Hey @showuon > Add hardcoded 5 for decompression, so, set the max message size to 202. After decompressed, the record size is 203, throw exception, while before bumping snappy version, it'll always be 202. If size of original decompressed message is 197, how is the size of message obtained after decompression 203? Doesn't this mean that the decompressed message later is not equal to original message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337258265 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337239851 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. Review Comment: and "try" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337238424 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. Review Comment: tries "to" rejoin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337235694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests
lianetm commented on code in PR #14364: URL: https://github.com/apache/kafka/pull/14364#discussion_r1337234985 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Manages the request creation and response handling for the heartbeat. The module creates a {@link ConsumerGroupHeartbeatRequest} + * using the state stored in the {@link MembershipManager} and enqueue it to the network queue to be sent out. Once + * the response is received, the module will update the state in the {@link MembershipManager} and handle any errors. + * + * The manager only emits heartbeat when the member is in a group, tries to join a group, or tries rejoin the group. + * If the member does not have groupId configured, got kicked out of the group, or encountering fatal exceptions, the + * heartbeat will not be sent. + * + * If the coordinator not is not found, we will skip sending the heartbeat and tries to find a coordinator first. + * + * If the heartbeat failed due to retriable errors, such as, TimeoutException. The subsequent attempt will be backoff + * exponentially. + * + * If the member completes the assignment changes, i.e. revocation and assignment, a heartbeat request will be sent in + * the next event loop. + * {@link HeartbeatRequestState} for more details. + */ +public class HeartbeatRequestManager implements RequestManager { +private final Logger logger; +private final Set fatalErrors = new HashSet<>(Arrays.asList( +Errors.GROUP_AUTHORIZATION_FAILED, +Errors.INVALID_REQUEST, +Errors.GROUP_MAX_SIZE_REACHED, +Errors.UNSUPPORTED_ASSIGNOR, +Errors.UNRELEASED_INSTANCE_ID)); + +private final int rebalanceTimeoutMs; + +private final CoordinatorRequestManager coordinatorRequestManager; +private final SubscriptionState subscriptions; +private final HeartbeatRequestState heartbeatRequestState; +private final MembershipManager membershipManager; +private final ErrorEventHandler nonRetriableErrorHandler; + +public HeartbeatRequestManager( +final Time time, +final LogContext logContext, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final SubscriptionState subscriptions, +final MembershipManager membershipManager, +final ErrorEventHandler nonRetriableErrorHandler) { +this.coordinatorRequestManager = coordinatorRequestManager; +this.logger = logContext.logger(getClass()); +this.subscriptions = subscriptions; +this.membershipManager = membershipManager; +this.nonRetriableErrorHandler = nonRetriableErrorHandler; +this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); +long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); +this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs, +retryBackoffMaxMs, rebalanceTimeoutMs); +} + +// Visible for testing +HeartbeatRequestManager( +final LogContext logContext, +final ConsumerConfig
[GitHub] [kafka] cadonna commented on pull request #14265: KAFKA-10199: Do not process when in PARTITIONS_REVOKED
cadonna commented on PR #14265: URL: https://github.com/apache/kafka/pull/14265#issuecomment-173554 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #14265: KAFKA-10199: Do not process when in PARTITIONS_REVOKED
cadonna merged PR #14265: URL: https://github.com/apache/kafka/pull/14265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #14439: KAFKA-15499: Fix the flaky DeleteSegmentsDueToLogStartOffsetBreach test.
kamalcph commented on PR #14439: URL: https://github.com/apache/kafka/pull/14439#issuecomment-1735500717 The test failures are unrelated. @showuon @divijvaidya Call for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
showuon commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735496347 One question I'd like to get your thoughts. This test is doing: 1. Generate a 2 byte key, and 128 byte value. 2. Create a record using snappy codec compressed. 3. Get the size of the record, which is 197. 4. Add hardcoded 5 for decompression, so, set the max message size to 202. 5. After decompressed, the record size is 203, throw exception, while before bumping snappy version, it'll always be 202. Here's my question: If the decompressed recodes can be read correctly in the consumer side(I haven't tested it, yet), then should we still worry about the additional 1byte after decompression? @divijvaidya @ijuma @jlprat , thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #14454: KAFKA-15344: Commit leader epoch where possible
lucasbru commented on code in PR #14454: URL: https://github.com/apache/kafka/pull/14454#discussion_r1337139205 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ## @@ -431,11 +433,20 @@ public Map prepareCommit() { } } -private Long findOffset(final TopicPartition partition) { +private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition partition) { Long offset = partitionGroup.headRecordOffset(partition); +Optional leaderEpoch = partitionGroup.headRecordLeaderEpoch(partition); +final long partitionTime = partitionGroup.partitionTimestamp(partition); if (offset == null) { try { offset = mainConsumer.position(partition); +// If we happen to commit the next offset after the last consumed record, use it's +// leader epoch. Otherwise, we do not know the leader epoch. +if (consumedOffsets.containsKey(partition) && offset == consumedOffsets.get(partition) + 1) { +leaderEpoch = consumedLeaderEpochs.get(partition); +} else { +leaderEpoch = Optional.empty(); Review Comment: Here, we'd insert the code for fetching the leader epoch for the current consumer position, if we make it possible in the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
showuon commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735456298 I'll try to investigate more tomorrow, and try to write a simple test case in snappy repo to simulate our test case. But again, if somebody on other timezones wants to take a stab at it, go ahead! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru opened a new pull request, #14454: KAFKA-15344: Commit leader epoch where possible
lucasbru opened a new pull request, #14454: URL: https://github.com/apache/kafka/pull/14454 Kafka Streams needs to include the leader epoch when committing offsets. Leader epoch is required to detect situations where a consumer with outdated metadata is trying to fetch the committed offset of a partition after being assigned that partition during a rebalance. The committed offset may be for a newer epoch than the consumer has in its metadata, leading to an OFFSET_OUT_OF_RANGE error and possible data loss. Without an extension of the consumer interface, it is not possible to set the correct leader offset in all circumstances. In particular, when we attempt to commit an offset that ends with a batch of control records, the leader offset of the control records are not exposed to Kafka streams. This can happen primarily in EOS mode, whenever Kafka streams' internal record buffers are depleted. This is a partial fix to avoid the situation described above in most cases - with the final fix for EOS still open. When committing an offset, and our internal record queue is non-empty, we commit the leader offset of the next record in the queue. We extend `StampedRecord`, `RecordQueue`, `RecordDeserializer` and `PartitionGroup` to expose and keep track of leader epochs. If our internal record queue is empty, we commit the position of the consumer. If the position of the consumer happens to be the last consumed offset + 1, we use the last consumed leader epoch in the commit, otherwise, we omit the leader epoch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website
[ https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769160#comment-17769160 ] ASF GitHub Bot commented on KAFKA-13882: divijvaidya commented on PR #410: URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1735454453 Dropped by to request an update to https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server and https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes (the latter is linked from https://kafka.apache.org/contributing.html ) after this PR is merged. > Dockerfile for previewing website > - > > Key: KAFKA-13882 > URL: https://issues.apache.org/jira/browse/KAFKA-13882 > Project: Kafka > Issue Type: Task > Components: docs, website >Reporter: Tom Bentley >Assignee: Lim Qing Wei >Priority: Trivial > Labels: newbie > > Previewing changes to the website/documentation is rather difficult because > you either have to [hack with the > HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository] > or [install > httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server]. > This is a barrier to contribution. > Having a Dockerfile for previewing the Kafka website (i.e. with httpd > properly set up) would make it easier for people to contribute website/docs > changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks
cadonna commented on PR #14437: URL: https://github.com/apache/kafka/pull/14437#issuecomment-1735413305 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks
cadonna merged PR #14437: URL: https://github.com/apache/kafka/pull/14437 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15508) Method always return the same value ApplicationEventProcessor.java
Svyatoslav created KAFKA-15508: -- Summary: Method always return the same value ApplicationEventProcessor.java Key: KAFKA-15508 URL: https://issues.apache.org/jira/browse/KAFKA-15508 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.5.1 Reporter: Svyatoslav I'm not sure but i think that this is a bug, because method 'process' in ApplicationEventProcessor.java always return true: private {color:#FF}boolean process{color}(final PollApplicationEvent event) { Optional commitRequestManger = registry.get(RequestManager.Type.COMMIT); if (!commitRequestManger.isPresent()) { {color:#FF} return true{color}; } CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get(); manager.updateAutoCommitTimer(event.pollTimeMs); {color:#FF}return true{color}; } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14453: MINOR: Close UnifiedLog created in tests to avoid resource leak
divijvaidya commented on code in PR #14453: URL: https://github.com/apache/kafka/pull/14453#discussion_r1337054159 ## core/src/test/scala/unit/kafka/log/LogLoaderTest.scala: ## @@ -1800,5 +1822,6 @@ class LogLoaderTest { isRemoteLogEnabled = isRemoteLogEnabled ).load() assertEquals(expectedLogStartOffset, offsets.logStartOffset) +log.close() Review Comment: what happens if the assertion in above line fails? we will have a leak in that case. ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -71,6 +72,13 @@ class UnifiedLogTest { @AfterEach def tearDown(): Unit = { brokerTopicStats.close() +try { + log.close() Review Comment: perhaps worth checking that (log != null) otherwise we may have null pointer exception here if the test exits before creating a log object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769119#comment-17769119 ] Divij Vaidya commented on KAFKA-15169: -- First some correction on what I said above. > The case you mention assumes that file sitting on disk may get corrupted but > that is a risk we choose to accept in Kafka, The files sitting on disk do actually get corrupted. We know of such cases when the disk gets full and sometimes leaves the indexes in an inconsistent state. We perform a restart on disk full case and hence, we can assume that during the lifecycle of a broker, files sitting on disk will not get corrupted. But on restart, we should definitely perform a check. Next, for test case 1, it validates recovery if the index fetched from remote was corrupted during network transfer, i.e. 1. we call getIndexEntry 2. It throws corrupt index exception( This exception will be thrown after fetching from remote storage ) at "index.sanityCheck();" (line 361) 3. I haven't looked at how we are handling it, but ideally the system should retry fetch from remote and this time it should succeed (no corruption during transfer), the test should validate that a retry occur and it is successful. Next, for test case 2, the test you mentioned sounds a nice addition. It validates the situation where we have a file on disk but it's not in cache. In such case, we should add cache entry from the file if it is correct else try to fetch from remote. You are right in assuming that this case code never occur (because ideally if a file exist on disk, it should have a corresponding entry in cache already), but this code is a fail safe scenario in case we are accidentally left with an inconsistency between the file on disk and in-memory cache. > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15507) adminClient should not throw retriable exception when closing instance
[ https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge reassigned KAFKA-15507: - Assignee: Gantigmaa Selenge > adminClient should not throw retriable exception when closing instance > -- > > Key: KAFKA-15507 > URL: https://issues.apache.org/jira/browse/KAFKA-15507 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 3.5.1 >Reporter: Luke Chen >Assignee: Gantigmaa Selenge >Priority: Major > > When adminClient is closing the instance, it'll first set > `hardShutdownTimeMs` to a positive timeout value, and then wait until > existing threads to complete within the timeout. However, within this > waiting, when new caller tries to invoke new commend in adminClient, it'll > immediately get an > {code:java} > TimeoutException("The AdminClient thread is not accepting new calls.") > {code} > There are some issues with the design: > 1. Since the `TimeoutException` is a retriable exception, the caller will > enter a tight loop and keep trying it > 2. The error message is confusing. What does "the adminClient is not > accepting new calls" mean? > We should improve it by throwing a non-retriable error (ex: > IllegalStateException), then, the error message should clearly describe the > adminClient is closing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4
jlprat commented on PR #14434: URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735292502 I spent some little time, but I couldn't find the root cause of why this is failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769103#comment-17769103 ] Arpit Goyal edited comment on KAFKA-15169 at 9/26/23 10:42 AM: --- [~divijvaidya] Just to confirm what I understood the code flow of the first test case 1. we call getIndexEntry 2. It throws corrupt storage exception( This exception will be thrown after fetching from remote storage ) i.e. {code:java} Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); // throws remote Storage exception {code} 3. We call getIndexEntry again 4. This time file already exist on disk , it will log the corrupted error 5. It will refetch from remote storage and passes the sanity check. The test case is basically to test the flow when corrupted file already exist on disk ? was (Author: JIRAUSER301926): [~divijvaidya] Just to confirm what I understood the flow of the first test case 1. we call getIndexEntry 2. It throws corrupt storage exception( This exception will be thrown after fetching from remote storage ) i.e. {code:java} Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); // throws remote Storage exception {code} 3. We call getIndexEntry again 4. This time file already exist on disk , it will log the corrupted error 5. It will refetch from remote storage and passes the sanity check. The test case is basically to test the flow when corrupted file already exist on disk ? > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769103#comment-17769103 ] Arpit Goyal commented on KAFKA-15169: - [~divijvaidya] Just to confirm what I understood the flow of the first test case 1. we call getIndexEntry 2. It throws corrupt storage exception( This exception will be thrown after fetching from remote storage ) i.e. {code:java} Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); // throws remote Storage exception {code} 3. We call getIndexEntry again 4. This time file already exist on disk , it will log the corrupted error 5. It will refetch from remote storage and passes the sanity check. The test case is basically to test the flow when corrupted file already exist on disk ? > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] tinaselenge opened a new pull request, #14453: MINOR: Close UnifiedLog created in tests to avoid resource leak
tinaselenge opened a new pull request, #14453: URL: https://github.com/apache/kafka/pull/14453 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14425: KAFKA-15401: Segment with corrupted index should not be uploaded to r…
divijvaidya commented on code in PR #14425: URL: https://github.com/apache/kafka/pull/14425#discussion_r1336965148 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -697,12 +711,22 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); +segmentCopyFailures++; } catch (Exception ex) { if (!isCancelled()) { brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark(); brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark(); logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex); } +} finally { + metricsGroup.newGauge(FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), new Gauge() { Review Comment: This metric is already update in the above catch clause as `brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();`. Can we add `brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();` and `brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();` to our corrupted index exception as well? (instead of create a new gauge here). We probably want to do it after our attempt to recreate index fails as mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14425: KAFKA-15401: Segment with corrupted index should not be uploaded to r…
divijvaidya commented on code in PR #14425: URL: https://github.com/apache/kafka/pull/14425#discussion_r1336950457 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -697,12 +711,22 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); +segmentCopyFailures++; Review Comment: We can re-create index from the segment in case it is corrupted. This would prevent need of any operator intervention. It will block uploading to TS for a time duration during which index is being reconstructed but the trade-off in favour of self-recovery is worth it in my opinion. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -1706,6 +1758,279 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() { } } +@Test +void testCorruptedTimeIndex() throws Exception { Review Comment: There is lot of duplicated code amongst the three tests added here. Can we use `@ParameterizedTest` and combine them together? ## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ## @@ -74,13 +74,13 @@ public void sanityCheck() { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; +if (entries() != 0 && lastOffset < baseOffset()) +throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " ++ "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); Review Comment: This is a nice change. We should fail fast by checking local state even before calling the expensive mmap() function below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15507) adminClient should not throw retriable exception when closing instance
[ https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769087#comment-17769087 ] Keith Wall commented on KAFKA-15507: Thank you for raising [~showuon] When I was looking at this issue yesterday, I noted that the behaviour of the Admin client is at odds with the behaviour of the Producer and Consumer client. With those I see: java.lang.IllegalStateException: Cannot perform operation after producer has been closed java.lang.IllegalStateException: This consumer has already been closed. > adminClient should not throw retriable exception when closing instance > -- > > Key: KAFKA-15507 > URL: https://issues.apache.org/jira/browse/KAFKA-15507 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 3.5.1 >Reporter: Luke Chen >Priority: Major > > When adminClient is closing the instance, it'll first set > `hardShutdownTimeMs` to a positive timeout value, and then wait until > existing threads to complete within the timeout. However, within this > waiting, when new caller tries to invoke new commend in adminClient, it'll > immediately get an > {code:java} > TimeoutException("The AdminClient thread is not accepting new calls.") > {code} > There are some issues with the design: > 1. Since the `TimeoutException` is a retriable exception, the caller will > enter a tight loop and keep trying it > 2. The error message is confusing. What does "the adminClient is not > accepting new calls" mean? > We should improve it by throwing a non-retriable error (ex: > IllegalStateException), then, the error message should clearly describe the > adminClient is closing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769086#comment-17769086 ] Arpit Goyal commented on KAFKA-15169: - Thanks [~divijvaidya] I have two questions based on the code walkthrough {code:java} private T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata, Function fetchRemoteIndex, Function readIndex) throws IOException { File indexFile = new File(cacheDir, file.getName()); T index = null; if (Files.exists(indexFile.toPath())) { try { index = readIndex.apply(indexFile); } catch (CorruptRecordException ex) { log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex); } } if (index == null) { File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX); try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata)) { Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING); } Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); } return index; } {code} In the RemoteIndexCache (loadIndexFile) function 1. First we check if file exists on the disk and do a sanityCheck. I believe this part of code will never be executed as it occurs only when there is a cache miss operation. 2. As per the first test case it would through Corrupt record exception at the later part of the code where we fetch it from remote segment and doing a sanityCheck {code:java} Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); {code} I was believing the first test case was related to file already exist on the disk and then call getIndexEntry 1. Create a empty/corrupt file on disk 2. Call getIndexEntry 3. It throws record corrupted action 4. In the next line it fetches from remote storage and restore the file. > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769072#comment-17769072 ] Divij Vaidya commented on KAFKA-15169: -- Separately, while you are writing tests for this cache, see if you would be interested to fix https://issues.apache.org/jira/browse/KAFKA-15481 as well. We would ideally like to write a test for the scenario mentioned in that ticket which fails prior to the fix and succeeds after it. > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769071#comment-17769071 ] Divij Vaidya commented on KAFKA-15169: -- Hey Arpit Asserting the sanity of the index (or any files on disk) is an expensive operation. Hence, we have to strike a balance on when do we assert sanity vs. trust that the file is not corrupted on disk. For logs, we perform CRC checksum while storing data on disk and after that the assumption is that files on disk will not get corrupted, i.e. we consider transfer over the network a possible culprit for corruption but don't consider that a file sitting on disk will get corrupted. Extending the same analogy to this cache, when we fetch the index files from remote store, they may be corrupted, so we perform a sanity check, but once stored on disk, we assume that files will not be corrupted. The case you mention assumes that file sitting on disk may get corrupted but that is a risk we choose to accept in Kafka, given the tradeoff mentioned above. Hence, the case you mentioned is an acceptable risk by design. > Add tests for RemoteIndexCache > -- > > Key: KAFKA-15169 > URL: https://issues.apache.org/jira/browse/KAFKA-15169 > Project: Kafka > Issue Type: Test >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Labels: KIP-405 > Fix For: 3.7.0 > > > Follow-up from > https://github.com/apache/kafka/pull/13275#discussion_r1257490978 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1
[ https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-15487. -- Resolution: Fixed > CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, > 12.0.1 > -- > > Key: KAFKA-15487 > URL: https://issues.apache.org/jira/browse/KAFKA-15487 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1 >Reporter: Rafael Rios Saavedra >Assignee: Divij Vaidya >Priority: Major > Labels: CVE, security > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version > {*}9.4.51{*}. For more information see > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] > Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address > this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1
[ https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769066#comment-17769066 ] Divij Vaidya commented on KAFKA-15487: -- We have backported this to all community supported versions (3.4, 3.5, 3.6) as per EOL policy at [https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy?] We have an upcoming 3.6.0 release which will contain this upgrade but we don't have 3.5.2 or 3.4.2 planned as of yet. If you have thoughts on Apache Kafka's EOL policy, please participate in the discussion at [https://lists.apache.org/thread/tzx4zkhfz26joq5ydq70bxcfr3zwy1hk] > CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, > 12.0.1 > -- > > Key: KAFKA-15487 > URL: https://issues.apache.org/jira/browse/KAFKA-15487 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1 >Reporter: Rafael Rios Saavedra >Assignee: Divij Vaidya >Priority: Major > Labels: CVE, security > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version > {*}9.4.51{*}. For more information see > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] > Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address > this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru merged pull request #14403: KAFKA-10199: Add missing catch for lock exception
lucasbru merged PR #14403: URL: https://github.com/apache/kafka/pull/14403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1
[ https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15487: - Fix Version/s: 3.4.2 > CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, > 12.0.1 > -- > > Key: KAFKA-15487 > URL: https://issues.apache.org/jira/browse/KAFKA-15487 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1 >Reporter: Rafael Rios Saavedra >Assignee: Divij Vaidya >Priority: Major > Labels: CVE, security > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version > {*}9.4.51{*}. For more information see > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] > Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address > this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on pull request #14403: KAFKA-10199: Add missing catch for lock exception
lucasbru commented on PR #14403: URL: https://github.com/apache/kafka/pull/14403#issuecomment-1735115556 build failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14438: KAFKA-15487: Upgrade Jetty to 9.4.52.v20230823
divijvaidya commented on PR #14438: URL: https://github.com/apache/kafka/pull/14438#issuecomment-1735115448 Since this PR addresses a CVE, I have backported this fix to all community supported version branches i.e. 3.4 and 3.5 (in addition to 3.6 backported by Satish) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru merged pull request #14436: MINOR: Revert log level changes in LogCaptureAppender
lucasbru merged PR #14436: URL: https://github.com/apache/kafka/pull/14436 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #14436: MINOR: Revert log level changes in LogCaptureAppender
lucasbru commented on PR #14436: URL: https://github.com/apache/kafka/pull/14436#issuecomment-1735100693 Build failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1
[ https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15487: - Fix Version/s: 3.6.0 3.5.2 (was: 3.0.0) (was: 2.8.0) (was: 2.7.1) (was: 2.6.2) > CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, > 12.0.1 > -- > > Key: KAFKA-15487 > URL: https://issues.apache.org/jira/browse/KAFKA-15487 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1 >Reporter: Rafael Rios Saavedra >Assignee: Divij Vaidya >Priority: Major > Labels: CVE, security > Fix For: 3.6.0, 3.5.2 > > > CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version > {*}9.4.51{*}. For more information see > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] > Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address > this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15504) Upgrade snappy java to version 1.1.10.4
[ https://issues.apache.org/jira/browse/KAFKA-15504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-15504. -- Resolution: Duplicate Closing as duplicate of prior ticket https://issues.apache.org/jira/browse/KAFKA-15498. > Upgrade snappy java to version 1.1.10.4 > --- > > Key: KAFKA-15504 > URL: https://issues.apache.org/jira/browse/KAFKA-15504 > Project: Kafka > Issue Type: Improvement >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Major > > The version 1.1.10.4 contains a fix of > [CVE-2023-43642|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv] > as mentioned on the release notes of the library > [https://github.com/xerial/snappy-java/releases/tag/v1.1.10.4] Fixed > SnappyInputStream so as not to allocate too large memory when decompressing > data with an extremely large chunk size by > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya closed pull request #14450: KAFKA-15504: Upgrade snappy java to version 1.1.10.4
divijvaidya closed pull request #14450: KAFKA-15504: Upgrade snappy java to version 1.1.10.4 URL: https://github.com/apache/kafka/pull/14450 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14450: KAFKA-15504: Upgrade snappy java to version 1.1.10.4
divijvaidya commented on PR #14450: URL: https://github.com/apache/kafka/pull/14450#issuecomment-1735075163 Hey @bmscomp We already have a prior PR and JIRA (https://issues.apache.org/jira/browse/KAFKA-15498) for this change https://github.com/apache/kafka/pull/14434 I am going to close this PR as duplicate. Please feel free to provide review on the other PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #14451: MINOR: Replace Java 20 with Java 21 in `README.md`
divijvaidya merged PR #14451: URL: https://github.com/apache/kafka/pull/14451 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15507) adminClient should not throw retriable exception when closing instance
Luke Chen created KAFKA-15507: - Summary: adminClient should not throw retriable exception when closing instance Key: KAFKA-15507 URL: https://issues.apache.org/jira/browse/KAFKA-15507 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 3.5.1 Reporter: Luke Chen When adminClient is closing the instance, it'll first set `hardShutdownTimeMs` to a positive timeout value, and then wait until existing threads to complete within the timeout. However, within this waiting, when new caller tries to invoke new commend in adminClient, it'll immediately get an {code:java} TimeoutException("The AdminClient thread is not accepting new calls.") {code} There are some issues with the design: 1. Since the `TimeoutException` is a retriable exception, the caller will enter a tight loop and keep trying it 2. The error message is confusing. What does "the adminClient is not accepting new calls" mean? We should improve it by throwing a non-retriable error (ex: IllegalStateException), then, the error message should clearly describe the adminClient is closing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lucasbru commented on a diff in pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks
lucasbru commented on code in PR #14437: URL: https://github.com/apache/kafka/pull/14437#discussion_r1336772774 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws Exception { verify(changelogReader, times(2)).enforceRestoreActive(); } +@Test +public void shouldAwaitWhenAllTasksPaused() throws Exception { +final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); +stateUpdater.start(); +stateUpdater.add(task); + +when(topologyMetadata.isPaused(null)).thenReturn(true); + +verifyPausedTasks(task); + +reset(changelogReader); +Thread.sleep(100); +verify(changelogReader, never()).restore(any()); Review Comment: I like the idea! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks
lucasbru commented on code in PR #14437: URL: https://github.com/apache/kafka/pull/14437#discussion_r1336770650 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws Exception { verify(changelogReader, times(2)).enforceRestoreActive(); } +@Test +public void shouldAwaitWhenAllTasksPaused() throws Exception { +final StreamTask task = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); +stateUpdater.start(); +stateUpdater.add(task); + +when(topologyMetadata.isPaused(null)).thenReturn(true); + +verifyPausedTasks(task); + +reset(changelogReader); Review Comment: I don't think this is quite true. Paused tasks will not be added to `updatingTasks`, but we may still go around the loop calling `restore` on a potentially empty set. This is testing that after a while, we do not go around the loop anymore. But it's actually not a good test, for the reason described above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15506) follower receive KafkaStorageException before leader raise disk error
wangliucheng created KAFKA-15506: Summary: follower receive KafkaStorageException before leader raise disk error Key: KAFKA-15506 URL: https://issues.apache.org/jira/browse/KAFKA-15506 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Environment: Kafka Version: 3.3.2 Jdk Version: jdk1.8.0_301 Deployment mode: kraft Reporter: wangliucheng In my kafka environment, topic has 2 replicas, leader and follower unavailable when disk error of leader The follower detects disk error before the leader Here is the logs: *follower recive KafkaStorageException:* {code:java} [2023-08-17 08:40:15,516] ERROR [ReplicaFetcher replicaId=4, leaderId=1, fetcherId=10] Error for partition __consumer_offsets-37 at offset 305860652 (kafka.server.ReplicaFetcherThread) org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk. {code} *isr shrink 4,1 to 1:* {code:java} [2023-08-17 08:41:49,953] INFO [Partition __consumer_offsets-37 broker=1] Shrinking ISR from 4,1 to 1. Leader: (highWatermark: 305860652, endOffset: 305860653). Out of sync replicas: (brokerId: 4, endOffset: 305860652). (kafka.cluster.Partition) {code} *broker marking dir to offline:* {code:java} [2023-08-17 08:41:50,188] ERROR Error while appending records to eb_raw_legendsec_flow_2-33 in dir /data09/kafka/log (kafka.server.LogDirFailureChannel) java.io.IOException: Read-only file system at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) at org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92) at org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188) at kafka.log.LogSegment.append(LogSegment.scala:158) at kafka.log.LocalLog.append(LocalLog.scala:436) at kafka.log.UnifiedLog.append(UnifiedLog.scala:949) at kafka.log.UnifiedLog.appendAsFollower(UnifiedLog.scala:778) at kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1121) at kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1128) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:121) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:336) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:325) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:324) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:324) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)