[jira] [Comment Edited] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769425#comment-17769425
 ] 

Arpit Goyal edited comment on KAFKA-15169 at 9/27/23 5:37 AM:
--

[~divijvaidya] As per the code RemoteIndexCache never retries if file gets 
corrupted after remote storage fetch. I will create a separate ticket to track 
this enhancement . For the 1st test case I am thinking of writing a  test case 
where it should throw corrupt exception if files get corrupted during remote 
fetch.
And adding Other test cases of what we discussed i will cover it is as part of 
this JIRA.  WDYT ?


was (Author: JIRAUSER301926):
[~divijvaidya] As per the code RemoteIndexCache never retries if file gets 
corrupted after remote storage fetch. I will create a separate ticket to track 
this enhancement  and which  indirectly cover1st test case we discussed.
Other test cases of what we discussed i will cover it is as part of this JIRA.  
WDYT ?

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769425#comment-17769425
 ] 

Arpit Goyal commented on KAFKA-15169:
-

[~divijvaidya] As per the code RemoteIndexCache never retries if file gets 
corrupted after remote storage fetch. I will create a separate ticket to track 
this enhancement  and which  indirectly cover1st test case we discussed.
Other test cases of what we discussed i will cover it is as part of this JIRA.  
WDYT ?

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15510) Follower's lastFetchedEpoch wrongly set when fetch response has no record

2023-09-26 Thread Chern Yih Cheah (Jira)
Chern Yih Cheah created KAFKA-15510:
---

 Summary: Follower's lastFetchedEpoch wrongly set when fetch 
response has no record
 Key: KAFKA-15510
 URL: https://issues.apache.org/jira/browse/KAFKA-15510
 Project: Kafka
  Issue Type: Bug
Reporter: Chern Yih Cheah
Assignee: Chern Yih Cheah


A regression is introduced by 
[https://github.com/apache/kafka/pull/13843/files#diff-508e9dc4d52744119dda36d69ce63a1901abfd3080ca72fc4554250b7e9f5242.|https://github.com/apache/kafka/pull/13843/files#diff-508e9dc4d52744119dda36d69ce63a1901abfd3080ca72fc4554250b7e9f5242]
 When the fetch response has no record for a partition, validBytes is 0. In 
this case, we shouldn't set the last fetch epoch to 
logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is 
Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.

An effect of this is truncation of fetch might not work correctly.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21

2023-09-26 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769418#comment-17769418
 ] 

Ismael Juma commented on KAFKA-15493:
-

And you used Java 21 for the `ducker-ak up` step?

> Ensure system tests work with Java 21
> -
>
> Key: KAFKA-15493
> URL: https://issues.apache.org/jira/browse/KAFKA-15493
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Said BOUDJELDA
>Priority: Major
> Fix For: 3.7.0
>
>
> Run the system tests as described below with Java 21:
> [https://github.com/apache/kafka/tree/trunk/tests]
> One relevant portion:
> Run tests with a different JVM (it may be as easy as replacing 11 with 21)
> {code:java}
> bash tests/docker/ducker-ak up -j 'openjdk:11'; 
> tests/docker/run_tests.sh{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337882369


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -259,17 +144,41 @@ long handlePollResult(NetworkClientDelegate.PollResult 
res) {
 }
 
 public boolean isRunning() {
-return this.running;
+return running;
 }
 
 public void wakeup() {
-networkClientDelegate.wakeup();
+if (networkClientDelegate != null)

Review Comment:
   Added a brief comment to explain this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337881557


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java:
##
@@ -217,29 +98,33 @@ public void run() {
 }
 } catch (final Throwable t) {
 log.error("The background thread failed due to unexpected error", 
t);
-throw new RuntimeException(t);
+throw new KafkaException(t);
 } finally {
 close();
-log.debug("{} closed", getClass());
+log.debug("Background thread closed");
 }
 }
 
+void initializeResources() {
+applicationEventProcessor = applicationEventProcessorSupplier.get();
+networkClientDelegate = networkClientDelegateSupplier.get();
+requestManagers = requestManagersSupplier.get();
+}
+
 /**
  * Poll and process an {@link ApplicationEvent}. It performs the following 
tasks:
  * 1. Drains and try to process all the requests in the queue.
  * 2. Iterate through the registry, poll, and get the next poll time for 
the network poll
  * 3. Poll the networkClient to send and retrieve the response.
  */
 void runOnce() {
-if (!applicationEventQueue.isEmpty()) {
-LinkedList res = new LinkedList<>();
-this.applicationEventQueue.drainTo(res);
+LinkedList events = new LinkedList<>();
+applicationEventQueue.drainTo(events);
 
-for (ApplicationEvent event : res) {
-log.debug("Consuming application event: {}", event);
-Objects.requireNonNull(event);
-applicationEventProcessor.process(event);
-}
+for (ApplicationEvent event : events) {
+log.trace("Dequeued event: {}", event);
+Objects.requireNonNull(event);

Review Comment:
   This code has been refactored, but it still has the check, just to be 
paranoid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337881130


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java:
##
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+public class BackgroundEventProcessor {
+
+private final Logger log;
+private final BlockingQueue backgroundEventQueue;
+
+public BackgroundEventProcessor(final LogContext logContext,
+final BlockingQueue 
backgroundEventQueue) {
+this.log = logContext.logger(BackgroundEventProcessor.class);
+this.backgroundEventQueue = backgroundEventQueue;
+}
+
+/**
+ * Drains all available {@link BackgroundEvent}s, and then processes them 
in order. If any
+ * errors are thrown as a result of a {@link ErrorBackgroundEvent} or an 
error occurs while processing
+ * another type of {@link BackgroundEvent}, only the first 
exception will be thrown, all
+ * subsequent errors will simply be logged at WARN level.
+ *
+ * @throws RuntimeException or subclass
+ */
+public void process() {
+LinkedList events = new LinkedList<>();
+backgroundEventQueue.drainTo(events);
+
+RuntimeException first = null;
+int errorCount = 0;
+
+for (BackgroundEvent event : events) {
+log.debug("Consuming background event: {}", event);
+
+try {
+process(event);
+} catch (RuntimeException e) {
+errorCount++;
+
+if (first == null) {
+first = e;
+log.warn("Error #{} from background thread (will be logged 
and thrown): {}", errorCount, e.getMessage(), e);

Review Comment:
   In the refactored version, the logging is not as verbose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337849711


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+private final Logger log;
+private final ErrorEventHandler errorEventHandler;
+private final NetworkClientDelegate networkClientDelegate;
+
+FetchRequestManager(final LogContext logContext,
+final Time time,
+final ErrorEventHandler errorEventHandler,
+final ConsumerMetadata metadata,
+final SubscriptionState subscriptions,
+final FetchConfig fetchConfig,
+final FetchMetricsManager metricsManager,
+final NetworkClientDelegate networkClientDelegate) {
+super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+this.log = logContext.logger(FetchRequestManager.class);
+this.errorEventHandler = errorEventHandler;
+this.networkClientDelegate = networkClientDelegate;
+}
+
+@Override
+protected boolean isUnavailable(Node node) {
+return networkClientDelegate.isUnavailable(node);
+}
+
+@Override
+protected void maybeThrowAuthFailure(Node node) {
+networkClientDelegate.maybeThrowAuthFailure(node);
+}
+
+@Override
+public PollResult poll(long currentTimeMs) {
+List requests;
+
+if (!idempotentCloser.isClosed()) {
+// If the fetcher is open (i.e. not closed), we will issue the 
normal fetch requests
+requests = prepareFetchRequests().entrySet().stream().map(entry -> 
{
+final Node fetchTarget = entry.getKey();
+final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
+final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
+final BiConsumer responseHandler = 
(clientResponse, t) -> {
+if (t != null) {
+handleFetchResponse(fetchTarget, t);
+log.warn("Attempt to fetch data from node {} failed 
due to fatal exception", fetchTarget, t);

Review Comment:
   It looks like the other call sites that add error events to the handler also 
log them 路‍♂️ 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * 

[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


showuon commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736393174

   Thanks all. (Sorry, pressing the wrong button on phone). Will verify the 
decompressed results are able to be correctly read today. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon closed pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


showuon closed pull request #14434: KAFKA-15498: bump snappy-java version to 
1.1.10.4
URL: https://github.com/apache/kafka/pull/14434


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


showuon commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736386116

   One question I'd like to get your thoughts. This test is doing:
   1. Generate a 2 byte key, and 128 byte value.
   2. Create a record using snappy codec compressed.
   3. Get the size of the record, which is 197.
   4. Add hardcoded 5 for decompression, so, set the max message size to 202.
   5. After decompressed, the record size is 203, throw exception, while before 
bumping snappy version, it'll always be 202.
   
   Here's my question: 
   If the decompressed recodes can be read correctly in the consumer side(I 
haven't tested it, yet), then should we still worry about the additional 1byte 
after decompression?
   
   @divijvaidya @ijuma @jlprat , thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21

2023-09-26 Thread Said BOUDJELDA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769355#comment-17769355
 ] 

Said BOUDJELDA commented on KAFKA-15493:


bash tests/docker/run_tests.sh passes with success 

> Ensure system tests work with Java 21
> -
>
> Key: KAFKA-15493
> URL: https://issues.apache.org/jira/browse/KAFKA-15493
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Said BOUDJELDA
>Priority: Major
> Fix For: 3.7.0
>
>
> Run the system tests as described below with Java 21:
> [https://github.com/apache/kafka/tree/trunk/tests]
> One relevant portion:
> Run tests with a different JVM (it may be as easy as replacing 11 with 21)
> {code:java}
> bash tests/docker/ducker-ak up -j 'openjdk:11'; 
> tests/docker/run_tests.sh{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe merged pull request #14399: KAFKA-15466: Add KIP-919 support for some admin APIs

2023-09-26 Thread via GitHub


cmccabe merged PR #14399:
URL: https://github.com/apache/kafka/pull/14399


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15493) Ensure system tests work with Java 21

2023-09-26 Thread Said BOUDJELDA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769348#comment-17769348
 ] 

Said BOUDJELDA commented on KAFKA-15493:


./gradlew clean systemTestLibs is passing with success 

> Ensure system tests work with Java 21
> -
>
> Key: KAFKA-15493
> URL: https://issues.apache.org/jira/browse/KAFKA-15493
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Said BOUDJELDA
>Priority: Major
> Fix For: 3.7.0
>
>
> Run the system tests as described below with Java 21:
> [https://github.com/apache/kafka/tree/trunk/tests]
> One relevant portion:
> Run tests with a different JVM (it may be as easy as replacing 11 with 21)
> {code:java}
> bash tests/docker/ducker-ak up -j 'openjdk:11'; 
> tests/docker/run_tests.sh{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1736256796

   I have the run the test ~200 times locally using the command below and they 
all succeeded. Also, the CI build on jenkins has succeeded twice [1] and [2]. 
IMO, we can safely merge this one.
   
   ```
   I=0; while ./gradlew :core:test --tests 
kafka.log.LogCleanerParameterizedIntegrationTest.testCleanerWithMessageFormatV0 
--rerun --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
   ```
   
   [1] 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14434/3/pipeline/
   [2] https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14434/4/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #14449: MINOR: Upgrade version of zstd-jni to the latest stable version 1.5.5-5

2023-09-26 Thread via GitHub


bmscomp commented on PR #14449:
URL: https://github.com/apache/kafka/pull/14449#issuecomment-1736244500

   @divijvaidya  Done 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #14281: KAFKA-15326: [9/N] Start and stop executors and cornercases

2023-09-26 Thread via GitHub


lucasbru commented on PR #14281:
URL: https://github.com/apache/kafka/pull/14281#issuecomment-1736221407

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #14456: KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java

2023-09-26 Thread via GitHub


nizhikov opened a new pull request, #14456:
URL: https://github.com/apache/kafka/pull/14456

   This PR is part of #13247 
   It contains `ReassignPartitionsIntegrationTest` rewritten in java.
   Goal of PR is reduce changes size in main PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337661714


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -636,42 +857,148 @@ public void assign(Collection 
partitions) {
 }
 
 @Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-throw new KafkaException("method not implemented");
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().equals(""))
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+this.subscriptions.subscribe(pattern, listener);
+this.updatePatternSubscription(metadata.fetch());
+this.metadata.requestUpdateForNewTopics();
+}
+
+/**
+ * TODO: remove this when we implement the KIP-848 protocol.
+ *
+ * 
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+private void updatePatternSubscription(Cluster cluster) {
+final Set topicsToSubscribe = cluster.topics().stream()
+.filter(subscriptions::matchesSubscribedPattern)
+.collect(Collectors.toSet());
+if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+metadata.requestUpdateForNewTopics();
 }
 
 @Override
 public void subscribe(Pattern pattern) {
-throw new KafkaException("method not implemented");
+subscribe(pattern, new NoOpConsumerRebalanceListener());
 }
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+fetchBuffer.retainAll(Collections.emptySet());
+this.subscriptions.unsubscribe();
 }
 
 @Override
 @Deprecated
-public ConsumerRecords poll(long timeout) {
-throw new KafkaException("method not implemented");
+public ConsumerRecords poll(final long timeoutMs) {
+return poll(Duration.ofMillis(timeoutMs));
 }
 
 // Visible for testing
 WakeupTrigger wakeupTrigger() {
 return wakeupTrigger;
 }
 
-private static  ClusterResourceListeners 
configureClusterResourceListeners(
-final Deserializer keyDeserializer,
-final Deserializer valueDeserializer,
-final List... candidateLists) {
-ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-for (List candidateList: candidateLists)
-clusterResourceListeners.maybeAddAll(candidateList);
+private void sendFetches() {
+FetchEvent event = new FetchEvent();
+eventHandler.add(event);
+
+event.future().whenComplete((completedFetches, error) -> {
+if (completedFetches != null && !completedFetches.isEmpty()) {
+fetchBuffer.addAll(completedFetches);

Review Comment:
   This has been reworked as part of a recent change. Not only are there are 
still two `FetchBuffer`s, but I've _added_ a blocking queue between them since 
the `FetchBuffer` should not be updated on the `Future` callback, since that is 
updated on another thread and `FetchBuffer` is not thread safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337660657


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -636,42 +857,148 @@ public void assign(Collection 
partitions) {
 }
 
 @Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-throw new KafkaException("method not implemented");
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().equals(""))
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+this.subscriptions.subscribe(pattern, listener);
+this.updatePatternSubscription(metadata.fetch());
+this.metadata.requestUpdateForNewTopics();
+}
+
+/**
+ * TODO: remove this when we implement the KIP-848 protocol.
+ *
+ * 
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+private void updatePatternSubscription(Cluster cluster) {
+final Set topicsToSubscribe = cluster.topics().stream()
+.filter(subscriptions::matchesSubscribedPattern)
+.collect(Collectors.toSet());
+if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+metadata.requestUpdateForNewTopics();
 }
 
 @Override
 public void subscribe(Pattern pattern) {
-throw new KafkaException("method not implemented");
+subscribe(pattern, new NoOpConsumerRebalanceListener());
 }
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+fetchBuffer.retainAll(Collections.emptySet());
+this.subscriptions.unsubscribe();
 }
 
 @Override
 @Deprecated
-public ConsumerRecords poll(long timeout) {
-throw new KafkaException("method not implemented");
+public ConsumerRecords poll(final long timeoutMs) {
+return poll(Duration.ofMillis(timeoutMs));
 }
 
 // Visible for testing
 WakeupTrigger wakeupTrigger() {
 return wakeupTrigger;
 }
 
-private static  ClusterResourceListeners 
configureClusterResourceListeners(
-final Deserializer keyDeserializer,
-final Deserializer valueDeserializer,
-final List... candidateLists) {
-ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-for (List candidateList: candidateLists)
-clusterResourceListeners.maybeAddAll(candidateList);
+private void sendFetches() {
+FetchEvent event = new FetchEvent();
+eventHandler.add(event);
+
+event.future().whenComplete((completedFetches, error) -> {
+if (completedFetches != null && !completedFetches.isEmpty()) {
+fetchBuffer.addAll(completedFetches);
+}
+});
+}
+
+/**
+ * @throws KafkaException if the rebalance callback throws exception
+ */
+private Fetch pollForFetches(Timer timer) {
+long pollTimeout = timer.remainingMs();
+
+// if data is available already, return it immediately
+final Fetch fetch = fetchCollector.collectFetch(fetchBuffer);
+if (!fetch.isEmpty()) {
+return fetch;
+}
+
+// send any new fetches (won't resend pending fetches)
+sendFetches();
+
+// We do not want to be stuck blocking in poll if we are missing some 
positions
+// since the offset lookup may be backing off after a failure
+
+// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
+// updateAssignmentMetadataIfNeeded before this method.
+if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
+pollTimeout = retryBackoffMs;
+}
+
+log.trace("Polling for fetches with timeout {}", pollTimeout);
+
+Timer pollTimer = time.timer(pollTimeout);
+
+// Attempt to fetch any data. It's OK if we time out here; it's a best 
case effort. The
+// data may not be immediately available, but the calling method 
(poll) will correctly
+// handle the overall timeout.
+try {
+Queue completedFetches = 
eventHandler.addAndGet(new FetchEvent(), pollTimer);

Review Comment:
   I've changed the mechanism to handle the responses from fetch 

[GitHub] [kafka] ahuang98 commented on a diff in pull request #14428: KAFKA-15489: resign leadership when no fetch from majority voters

2023-09-26 Thread via GitHub


ahuang98 commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1337653712


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -76,9 +85,37 @@ protected LeaderState(
 boolean hasAcknowledgedLeader = voterId == localId;
 this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
 }
+this.majority = voters.size() / 2;
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {

Review Comment:
   nit: these are pretty lengthy and arguably unintuitive method names, could 
we add a short comment on what each method does?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337639317


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, left the group, or 
encountering fatal exceptions, the heartbeat will
+ * not be sent. If the coordinator not is not found, we will skip sending the 
heartbeat and tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the partition revocation process, a heartbeat 
request will be sent in the next event loop.
+ *
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final HeartbeatRequestState heartbeatRequestState,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.logger = logContext.logger(this.getClass());
+   

[GitHub] [kafka] zhengyd2014 commented on a diff in pull request #14444: KIP-951: Server side and protocol changes for KIP-951

2023-09-26 Thread via GitHub


zhengyd2014 commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1337630516


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -276,11 +289,16 @@ private static FetchResponseData toMessage(Errors error,
 .setPartitions(partitionResponses));
 }
 }
-
-return new FetchResponseData()
-.setThrottleTimeMs(throttleTimeMs)
-.setErrorCode(error.code())
-.setSessionId(sessionId)
-.setResponses(topicResponseList);
+data.setThrottleTimeMs(throttleTimeMs)
+.setErrorCode(error.code())
+.setSessionId(sessionId)
+.setResponses(topicResponseList);
+nodeEndpoints.forEach(endpoint -> data.nodeEndpoints().add(

Review Comment:
   it makes sense, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #14064: KAFKA-15030: Add connect-plugin-path command-line tool.

2023-09-26 Thread via GitHub


nizhikov commented on PR #14064:
URL: https://github.com/apache/kafka/pull/14064#issuecomment-1736054928

   In case someone interested, adding `:connect:api` dependency to `tools` 
somehow make `:storage:api` classes invisible.
   I fix it adding explicit dependency on `:storage:api` project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15509) KRaft Controller doesn't consider brokers that are shutting down during topic creating

2023-09-26 Thread Jira
José Armando García Sancio created KAFKA-15509:
--

 Summary: KRaft Controller doesn't consider brokers that are 
shutting down during topic creating
 Key: KAFKA-15509
 URL: https://issues.apache.org/jira/browse/KAFKA-15509
 Project: Kafka
  Issue Type: Bug
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


The topic creation implementation in KRaft is able to user brokers that are 
fenced as replicas during topic creations. The restriction is that the 
controller won't assign leadership to those topics.

This feature makes it possible for users to roll clusters with 3 brokers 
without sacrificing topic creation availability.

Looking at the code it looks like the KRaft controller doesn't consider brokers 
that are shutting down as eligible for replicas during topic creation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru merged pull request #14226: KAFKA-15326: [8/N] Move consumer interaction out of processing methods

2023-09-26 Thread via GitHub


lucasbru merged PR #14226:
URL: https://github.com/apache/kafka/pull/14226


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #14226: KAFKA-15326: [8/N] Move consumer interaction out of processing methods

2023-09-26 Thread via GitHub


lucasbru commented on PR #14226:
URL: https://github.com/apache/kafka/pull/14226#issuecomment-1735852839

   Failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444877


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -526,13 +677,57 @@ public void close() {
 close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
 }
 
+private Timer createTimerForRequest(final Duration timeout) {
+// this.time could be null if an exception occurs in constructor prior 
to setting the this.time field

Review Comment:
   `createTimerForRequest()` is no longer used, so marking this as resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r133756


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -526,13 +677,57 @@ public void close() {
 close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
 }
 
+private Timer createTimerForRequest(final Duration timeout) {
+// this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+final Time localTime = (time == null) ? Time.SYSTEM : time;
+return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+}
+
 @Override
 public void close(Duration timeout) {
+if (timeout.toMillis() < 0)
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+try {
+if (!closed) {
+// need to close before setting the flag since the close 
function
+// itself may trigger rebalance callback that needs the 
consumer to be open still
+close(timeout, false);
+}
+} finally {
+closed = true;
+}
+}
+
+private void close(Duration timeout, boolean swallowException) {
+log.trace("Closing the Kafka consumer");
 AtomicReference firstException = new AtomicReference<>();
+
+final Timer closeTimer = createTimerForRequest(timeout);
+if (fetchBuffer != null) {
+// the timeout for the session close is at-most the 
requestTimeoutMs
+long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+if (remainingDurationInTimeout > 0) {
+remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);

Review Comment:
   Removed the unnecessary timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337444033


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -526,13 +677,57 @@ public void close() {
 close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
 }
 
+private Timer createTimerForRequest(final Duration timeout) {
+// this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
+final Time localTime = (time == null) ? Time.SYSTEM : time;
+return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+}
+
 @Override
 public void close(Duration timeout) {
+if (timeout.toMillis() < 0)
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+
+try {
+if (!closed) {
+// need to close before setting the flag since the close 
function
+// itself may trigger rebalance callback that needs the 
consumer to be open still
+close(timeout, false);
+}
+} finally {
+closed = true;
+}
+}
+
+private void close(Duration timeout, boolean swallowException) {
+log.trace("Closing the Kafka consumer");
 AtomicReference firstException = new AtomicReference<>();
+
+final Timer closeTimer = createTimerForRequest(timeout);
+if (fetchBuffer != null) {
+// the timeout for the session close is at-most the 
requestTimeoutMs
+long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+if (remainingDurationInTimeout > 0) {
+remainingDurationInTimeout = Math.min(requestTimeoutMs, 
remainingDurationInTimeout);
+}
+
+closeTimer.reset(remainingDurationInTimeout);
+
+// This is a blocking call bound by the time remaining in 
closeTimer

Review Comment:
   I've updated this so that the `close()` method is a lot cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #14406: KAFKA-14274 [6, 7]: Introduction of fetch request manager

2023-09-26 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1337443244


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() {
 }
 
 /**
- * Refresh the committed offsets for partitions that require 
initialization.
+ * Refresh the committed offsets for provided partitions.

Review Comment:
   I reverted the change to the comment. I don't remember changing it 路‍♂️ 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -684,7 +1011,7 @@ private boolean isCommittedOffsetsManagementEnabled() {
 }
 
 /**
- * Refresh the committed offsets for partitions that require 
initialization.
+ * Refresh the committed offsets for provided partitions.

Review Comment:
   I reverted the change to the comment. I don't remember changing it 路‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337428627


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   Ok, that's fine but not enough. I think here we also need the tags. If you 
look at the java doc it shows as a giant block, which I expect it is not what 
we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337423659


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;

Review Comment:
   we don't make these var final in tests because we could change them later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337420489


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] philipnee commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337418980


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   This is just for comment formatting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on PR #14364:
URL: https://github.com/apache/kafka/pull/14364#issuecomment-1735694640

   Thanks for the changes @philipnee, left a few other minor comments and 
questions. LGTM. 
   
   As I see it, the main areas requiring follow-up in other PRs would be:
   
   - fully integrate with the state defined in the membershipManager (getting 
rid of all the parallel `groupState` defined here)
   - integrate with the assignment processing component, driving the logic to 
delegate callback execution and send HB on completion as required.
   - extend HB manager test to cover successful path and timeout scenarios.
   
   @dajac it would be helpful if you can take another look at it now, as it has 
evolved quite a bit. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735667819

   Since @showuon is asleep, I took the liberty of adding an extra byte to the 
assumed size of compressed message produced by snappy. Let's see if the test 
passes now. Will run it multiple times.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337309504


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -58,7 +59,7 @@
 
 public class HeartbeatRequestManagerTest {

Review Comment:
   High level comment, I do see this test covering the timing logic for 
sending, and the response handling on error, but nothing for the successful HB 
response handling (important to ensure that it is updating the target 
assignment so that it can be processed by other components). Also it would be 
helpful to have some tests around HB timeouts, mainly to validate the retry 
logic around that. 
   (Just suggestions for better coverage of core actions, OK for me if we 
prefer to target that in a separate PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337295428


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -173,24 +194,22 @@ public void testHeartbeatResponseOnErrorHandling(final 
Errors error, final boole
 heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
 mockLogContext,
 mockTime,
-heartbeatInterval,
+heartbeatIntervalMs,
 retryBackoffMs,
 retryBackoffMaxMs,
 0);
-when(mockMembershipManager.state()).thenReturn(STABLE);
 heartbeatRequestManager = createManager();
 
-// Sending first heartbeat to set the state to STABLE
+// Sending first heartbeat w/o assignment to set the state to STABLE
 ConsumerGroupHeartbeatResponse rs1 = new 
ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
-.setHeartbeatIntervalMs(heartbeatInterval)
+.setHeartbeatIntervalMs(heartbeatIntervalMs)
 .setMemberId(memberId)
-.setMemberEpoch(memberEpoch)
-.setAssignment(memberAssignment));

Review Comment:
   seems we're not using `memberAssignment` in the test anymore? let's remove 
if unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735632838

   At 
https://github.com/apache/kafka/blob/65efb981347d6f81fb2713cd27cdfdfa9d8781b9/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala#L149
 we assume that after compression, the snappy will add 5 additional bytes. But 
that is an assumption which is dependent on Snappy's internal implementation. 
For the purpose of this test, we can set a reasonable value like 
`largeMessageSet.sizeInBytes + 50` (to be future proof)  and it should pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282860


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;
+private ConsumerGroupHeartbeatResponseData.Assignment memberAssignment = 
mockAssignment();
+private ErrorEventHandler errorEventHandler;
+
+private ConsumerGroupHeartbeatResponseData.Assignment mockAssignment() {
+return new ConsumerGroupHeartbeatResponseData.Assignment()
+.setAssignedTopicPartitions(Arrays.asList(
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(0, 1, 2)),
+new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+.setTopicId(Uuid.randomUuid())
+.setPartitions(Arrays.asList(3, 4, 5))
+));
+}
+
+@BeforeEach
+public void setUp() {
+mockTime = new MockTime();
+mockLogContext = new LogContext();
+Properties properties = new Properties();
+properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337282271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.internals.MemberState.FAILED;
+import static org.apache.kafka.clients.consumer.internals.MemberState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HeartbeatRequestManagerTest {
+
+private final int heartbeatIntervalMs = 1000;
+private final long retryBackoffMaxMs = 3000;
+private final long retryBackoffMs = 100;
+private final String groupId = "group-id";
+
+private Time mockTime;
+private LogContext mockLogContext;
+private CoordinatorRequestManager mockCoordinatorRequestManager;
+private SubscriptionState mockSubscriptionState;
+private HeartbeatRequestManager heartbeatRequestManager;
+private MembershipManager mockMembershipManager;
+private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+private ConsumerConfig config;
+
+private String memberId = "member-id";
+private int memberEpoch = 1;

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge opened a new pull request, #14455: KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing

2023-09-26 Thread via GitHub


tinaselenge opened a new pull request, #14455:
URL: https://github.com/apache/kafka/pull/14455

   AdminClient will throw IllegalStateException instead of TimeoutException if 
it receives new calls while closing down. This is more consistent with how 
Consumer and Producer clients handle new calls after closed down.  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337278130


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -59,4 +59,20 @@ public interface MembershipManager {
  * current assignment.
  */
 void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+/**
+ * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a

Review Comment:
   Extra space after state.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##
@@ -59,4 +59,20 @@ public interface MembershipManager {
  * current assignment.
  */
 void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment);
+
+/**
+ * Transition the member to the FENCED state.  This is only invoked when 
the heartbeat returns a
+ * FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error code.
+ */
+void fenceMember();
+
+/**
+ * Transition the member to the FAILED state.  This is invoked when the 
heartbeat returns a non-retriable error.

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


ijuma commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735623002

   We don't know, but it's the nature of compression libraries - there can be 
some variations as the algorithms are  tweaked for performance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337275381


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


jlprat commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735612270

   Is it just 1 extra byte? Or are there cases where some more extra bytes are 
added?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337269007


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *

Review Comment:
   These empty lines won't show as such in the java doc so let's add tags to 
ensure we have the separation we want



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


ijuma commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735607326

   The extra byte may be ok since the underlying snappy library was also 
upgraded in this new version 
https://github.com/xerial/snappy-java/commit/f2e97f27be0dc6c691369040ba8a673bface484c
   
   Our test looks to rely on implementation details of the compression library.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337265790


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337263803


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


jlprat commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735603472

   I think the problem is in the details, why this extra byte? Also, 1 extra 
byte might not be relevant, but at some point the overhead of extra bytes might 
cause performance regressions (overhead on network traffic + increased memory 
footprint).
   
   I did a bit of debugging on [`UnifiedLog.scala` line 
826](https://github.com/apache/kafka/blob/3.6/core/src/main/scala/kafka/log/UnifiedLog.scala#L826)
 and found out that all the `dups` are in theory written with the same Snappy 
codec but they seem to have the same size with the previous snappy version and 
the newer one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735602069

   Hey @showuon 
   
   > Add hardcoded 5 for decompression, so, set the max message size to 202.
   After decompressed, the record size is 203, throw exception, while before 
bumping snappy version, it'll always be 202.
   
   If size of original decompressed message is 197, how is the size of message 
obtained after decompression 203? Doesn't this mean that the decompressed 
message later is not equal to original message?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337258265


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337239851


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.

Review Comment:
   and "try"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337238424


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.

Review Comment:
   tries "to" rejoin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337235694


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] lianetm commented on a diff in pull request #14364: KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests

2023-09-26 Thread via GitHub


lianetm commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1337234985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. The 
module creates a {@link ConsumerGroupHeartbeatRequest}
+ * using the state stored in the {@link MembershipManager} and enqueue it to 
the network queue to be sent out. Once
+ * the response is received, the module will update the state in the {@link 
MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join a group, or tries rejoin the group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat and 
tries to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, TimeoutException. 
 The subsequent attempt will be backoff
+ * exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be sent in
+ * the next event loop.
+ * {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final LogContext logContext,
+final ConsumerConfig 

[GitHub] [kafka] cadonna commented on pull request #14265: KAFKA-10199: Do not process when in PARTITIONS_REVOKED

2023-09-26 Thread via GitHub


cadonna commented on PR #14265:
URL: https://github.com/apache/kafka/pull/14265#issuecomment-173554

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #14265: KAFKA-10199: Do not process when in PARTITIONS_REVOKED

2023-09-26 Thread via GitHub


cadonna merged PR #14265:
URL: https://github.com/apache/kafka/pull/14265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #14439: KAFKA-15499: Fix the flaky DeleteSegmentsDueToLogStartOffsetBreach test.

2023-09-26 Thread via GitHub


kamalcph commented on PR #14439:
URL: https://github.com/apache/kafka/pull/14439#issuecomment-1735500717

   The test failures are unrelated. @showuon @divijvaidya Call for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


showuon commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735496347

   One question I'd like to get your thoughts. This test is doing:
   1. Generate a 2 byte key, and 128 byte value.
   2. Create a record using snappy codec compressed.
   3. Get the size of the record, which is 197.
   4. Add hardcoded 5 for decompression, so, set the max message size to 202.
   5. After decompressed, the record size is 203, throw exception, while before 
bumping snappy version, it'll always be 202.
   
   Here's my question: 
   If the decompressed recodes can be read correctly in the consumer side(I 
haven't tested it, yet), then should we still worry about the additional 1byte 
after decompression?
   
   @divijvaidya @ijuma @jlprat , thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14454: KAFKA-15344: Commit leader epoch where possible

2023-09-26 Thread via GitHub


lucasbru commented on code in PR #14454:
URL: https://github.com/apache/kafka/pull/14454#discussion_r1337139205


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -431,11 +433,20 @@ public Map 
prepareCommit() {
 }
 }
 
-private Long findOffset(final TopicPartition partition) {
+private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition 
partition) {
 Long offset = partitionGroup.headRecordOffset(partition);
+Optional leaderEpoch = 
partitionGroup.headRecordLeaderEpoch(partition);
+final long partitionTime = 
partitionGroup.partitionTimestamp(partition);
 if (offset == null) {
 try {
 offset = mainConsumer.position(partition);
+// If we happen to commit the next offset after the last 
consumed record, use it's
+// leader epoch. Otherwise, we do not know the leader epoch.
+if (consumedOffsets.containsKey(partition) && offset == 
consumedOffsets.get(partition) + 1) {
+leaderEpoch = consumedLeaderEpochs.get(partition);
+} else {
+leaderEpoch = Optional.empty();

Review Comment:
   Here, we'd insert the code for fetching the leader epoch for the current 
consumer position, if we make it possible in the consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


showuon commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735456298

   I'll try to investigate more tomorrow, and try to write a simple test case 
in snappy repo to simulate our test case. But again, if somebody on other 
timezones wants to take a stab at it, go ahead!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #14454: KAFKA-15344: Commit leader epoch where possible

2023-09-26 Thread via GitHub


lucasbru opened a new pull request, #14454:
URL: https://github.com/apache/kafka/pull/14454

   Kafka Streams needs to include the leader epoch when committing offsets.
   
   Leader epoch is required to detect situations where a consumer with outdated 
metadata is trying to fetch the committed offset of a partition after being 
assigned that partition during a rebalance. The committed offset may be for a 
newer epoch than the consumer has in its metadata, leading to an 
OFFSET_OUT_OF_RANGE error and possible data loss.
   
   Without an extension of the consumer interface, it is not possible to set 
the correct leader offset in all circumstances. In particular, when we attempt 
to commit an offset that ends with a batch of control records, the leader 
offset of the control records are not exposed to Kafka streams. This can happen 
primarily in EOS mode, whenever Kafka streams' internal record buffers are 
depleted.
   
   This is a partial fix to avoid the situation described above in most cases - 
with the final fix for EOS still open.
   
   When committing an offset, and our internal record queue is non-empty, we 
commit the leader offset of the next record in the queue. We extend 
`StampedRecord`, `RecordQueue`, `RecordDeserializer` and `PartitionGroup` to 
expose and keep track of leader epochs.
   
   If our internal record queue is empty, we commit the position of the 
consumer. If the position of the consumer happens to be the last consumed 
offset + 1, we use the last consumed leader epoch in the commit, otherwise, we 
omit the leader epoch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13882) Dockerfile for previewing website

2023-09-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769160#comment-17769160
 ] 

ASF GitHub Bot commented on KAFKA-13882:


divijvaidya commented on PR #410:
URL: https://github.com/apache/kafka-site/pull/410#issuecomment-1735454453

   Dropped by to request an update to 
https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server
  and 
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes
 (the latter is linked from https://kafka.apache.org/contributing.html ) after 
this PR is merged.




> Dockerfile for previewing website
> -
>
> Key: KAFKA-13882
> URL: https://issues.apache.org/jira/browse/KAFKA-13882
> Project: Kafka
>  Issue Type: Task
>  Components: docs, website
>Reporter: Tom Bentley
>Assignee: Lim Qing Wei
>Priority: Trivial
>  Labels: newbie
>
> Previewing changes to the website/documentation is rather difficult because 
> you either have to [hack with the 
> HTML|https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes#ContributingWebsiteDocumentationChanges-KafkaWebsiteRepository]
>  or [install 
> httpd|https://cwiki.apache.org/confluence/display/KAFKA/Setup+Kafka+Website+on+Local+Apache+Server].
>  This is a barrier to contribution.
> Having a Dockerfile for previewing the Kafka website (i.e. with httpd 
> properly set up) would make it easier for people to contribute website/docs 
> changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks

2023-09-26 Thread via GitHub


cadonna commented on PR #14437:
URL: https://github.com/apache/kafka/pull/14437#issuecomment-1735413305

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks

2023-09-26 Thread via GitHub


cadonna merged PR #14437:
URL: https://github.com/apache/kafka/pull/14437


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15508) Method always return the same value ApplicationEventProcessor.java

2023-09-26 Thread Svyatoslav (Jira)
Svyatoslav created KAFKA-15508:
--

 Summary: Method always return the same value 
ApplicationEventProcessor.java
 Key: KAFKA-15508
 URL: https://issues.apache.org/jira/browse/KAFKA-15508
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.5.1
Reporter: Svyatoslav


I'm not sure but i think that this is a bug, because method 'process' in 
ApplicationEventProcessor.java always return true:

    private {color:#FF}boolean process{color}(final PollApplicationEvent 
event) {
        Optional commitRequestManger = 
registry.get(RequestManager.Type.COMMIT);
        if (!commitRequestManger.isPresent()) {
           {color:#FF} return true{color};
        }

        CommitRequestManager manager = (CommitRequestManager) 
commitRequestManger.get();
        manager.updateAutoCommitTimer(event.pollTimeMs);
        {color:#FF}return true{color};
    }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #14453: MINOR: Close UnifiedLog created in tests to avoid resource leak

2023-09-26 Thread via GitHub


divijvaidya commented on code in PR #14453:
URL: https://github.com/apache/kafka/pull/14453#discussion_r1337054159


##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -1800,5 +1822,6 @@ class LogLoaderTest {
   isRemoteLogEnabled = isRemoteLogEnabled
 ).load()
 assertEquals(expectedLogStartOffset, offsets.logStartOffset)
+log.close()

Review Comment:
   what happens if the assertion in above line fails? we will have a leak in 
that case.



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -71,6 +72,13 @@ class UnifiedLogTest {
   @AfterEach
   def tearDown(): Unit = {
 brokerTopicStats.close()
+try {
+  log.close()

Review Comment:
   perhaps worth checking that (log != null) otherwise we may have null pointer 
exception here if the test exits before creating a log object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769119#comment-17769119
 ] 

Divij Vaidya commented on KAFKA-15169:
--

First some correction on what I said above. 

> The case you mention assumes that file sitting on disk may get corrupted but 
> that is a risk we choose to accept in Kafka,

The files sitting on disk do actually get corrupted. We know of such cases when 
the disk gets full and sometimes leaves the indexes in an inconsistent state. 
We perform a restart on disk full case and hence, we can assume that during the 
lifecycle of a broker, files sitting on disk will not get corrupted. But on 
restart, we should definitely perform a check.

Next, for test case 1, it validates recovery if the index fetched from remote 
was corrupted during network transfer, i.e.

1. we call getIndexEntry
2. It throws corrupt index exception( This exception will be thrown after 
fetching from remote storage ) at 
"index.sanityCheck();" (line 361)
3. I haven't  looked at how we are handling it, but ideally the system should 
retry fetch from remote and this time it should succeed (no corruption during 
transfer), the test should validate that a retry occur and it is successful.


Next, for test case 2, the test you mentioned sounds a nice addition. It 
validates the situation where we have a file on disk but it's not in cache. In 
such case, we should add cache entry from the file if it is correct else try to 
fetch from remote. You are right in assuming that this case code never occur 
(because ideally if a file exist on disk, it should have a corresponding entry 
in cache already), but this code is a fail safe scenario in case we are 
accidentally left with an inconsistency between the file on disk and in-memory 
cache.

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-09-26 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge reassigned KAFKA-15507:
-

Assignee: Gantigmaa Selenge

> adminClient should not throw retriable exception when closing instance
> --
>
> Key: KAFKA-15507
> URL: https://issues.apache.org/jira/browse/KAFKA-15507
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Gantigmaa Selenge
>Priority: Major
>
> When adminClient is closing the instance, it'll first set 
> `hardShutdownTimeMs` to a positive timeout value, and then wait until 
> existing threads to complete within the timeout. However, within this 
> waiting, when new caller tries to invoke new commend in adminClient, it'll 
> immediately get an 
> {code:java}
> TimeoutException("The AdminClient thread is not accepting new calls.")
> {code}
> There are some issues with the design:
> 1. Since the `TimeoutException` is a retriable exception, the caller will 
> enter a tight loop and keep trying it
> 2. The error message is confusing. What does "the adminClient is not 
> accepting new calls" mean?
> We should improve it by throwing a non-retriable error (ex: 
> IllegalStateException), then, the error message should clearly describe the 
> adminClient is closing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jlprat commented on pull request #14434: KAFKA-15498: bump snappy-java version to 1.1.10.4

2023-09-26 Thread via GitHub


jlprat commented on PR #14434:
URL: https://github.com/apache/kafka/pull/14434#issuecomment-1735292502

   I spent some little time, but I couldn't find the root cause of why this is 
failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769103#comment-17769103
 ] 

Arpit Goyal edited comment on KAFKA-15169 at 9/26/23 10:42 AM:
---

[~divijvaidya] Just to confirm what I understood the code flow  of the first 
test case 

1. we call getIndexEntry
2. It throws corrupt storage exception( This exception will be thrown after 
fetching from remote storage )
i.e.

{code:java}
 Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
   index = readIndex.apply(indexFile);  // throws remote Storage exception
{code}
3. We call getIndexEntry again 
4. This time file already exist on disk , it will log the corrupted error 
 5. It  will refetch from remote storage and passes the sanity check. 
The test case is basically to test the flow when corrupted file already exist 
on disk ?


was (Author: JIRAUSER301926):
[~divijvaidya] Just to confirm what I understood the flow  of the first test 
case 

1. we call getIndexEntry
2. It throws corrupt storage exception( This exception will be thrown after 
fetching from remote storage )
i.e.

{code:java}
 Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
   index = readIndex.apply(indexFile);  // throws remote Storage exception
{code}
3. We call getIndexEntry again 
4. This time file already exist on disk , it will log the corrupted error 
 5. It  will refetch from remote storage and passes the sanity check. 
The test case is basically to test the flow when corrupted file already exist 
on disk ?

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769103#comment-17769103
 ] 

Arpit Goyal commented on KAFKA-15169:
-

[~divijvaidya] Just to confirm what I understood the flow  of the first test 
case 

1. we call getIndexEntry
2. It throws corrupt storage exception( This exception will be thrown after 
fetching from remote storage )
i.e.

{code:java}
 Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
   index = readIndex.apply(indexFile);  // throws remote Storage exception
{code}
3. We call getIndexEntry again 
4. This time file already exist on disk , it will log the corrupted error 
 5. It  will refetch from remote storage and passes the sanity check. 
The test case is basically to test the flow when corrupted file already exist 
on disk ?

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tinaselenge opened a new pull request, #14453: MINOR: Close UnifiedLog created in tests to avoid resource leak

2023-09-26 Thread via GitHub


tinaselenge opened a new pull request, #14453:
URL: https://github.com/apache/kafka/pull/14453

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14425: KAFKA-15401: Segment with corrupted index should not be uploaded to r…

2023-09-26 Thread via GitHub


divijvaidya commented on code in PR #14425:
URL: https://github.com/apache/kafka/pull/14425#discussion_r1336965148


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -697,12 +711,22 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {} ", topicIdPartition, ex);
+segmentCopyFailures++;
 } catch (Exception ex) {
 if (!isCancelled()) {
 
brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
 
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
 logger.error("Error occurred while copying log segments of 
partition: {}", topicIdPartition, ex);
 }
+} finally {
+
metricsGroup.newGauge(FAILED_REMOTE_COPY_PER_SEC_METRIC.getName(), new 
Gauge() {

Review Comment:
   This metric is already update in the above catch clause as 
`brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();`. 
   
   Can we add 
`brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();` and 
`brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteCopyRequestRate().mark();`
 to our corrupted index exception as well? (instead of create a new gauge 
here). We probably want to do it after our attempt to recreate index fails as 
mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14425: KAFKA-15401: Segment with corrupted index should not be uploaded to r…

2023-09-26 Thread via GitHub


divijvaidya commented on code in PR #14425:
URL: https://github.com/apache/kafka/pull/14425#discussion_r1336950457


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -697,12 +711,22 @@ public void copyLogSegmentsToRemote(UnifiedLog log) 
throws InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {} ", topicIdPartition, ex);
+segmentCopyFailures++;

Review Comment:
   We can re-create index from the segment in case it is corrupted. This would 
prevent need of any operator intervention. It will block uploading to TS for a 
time duration during which index is being reconstructed but the trade-off in 
favour of self-recovery is worth it in my opinion.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -1706,6 +1758,279 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 }
 }
 
+@Test
+void testCorruptedTimeIndex() throws Exception {

Review Comment:
   There is lot of duplicated code amongst the three tests added here. Can we 
use `@ParameterizedTest` and combine them together?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##
@@ -74,13 +74,13 @@ public void sanityCheck() {
 TimestampOffset entry = lastEntry();
 long lastTimestamp = entry.timestamp;
 long lastOffset = entry.offset;
+if (entries() != 0 && lastOffset < baseOffset())
+throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
++ "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());

Review Comment:
   This is a nice change. We should fail fast by checking local state even 
before calling the expensive mmap() function below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-09-26 Thread Keith Wall (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769087#comment-17769087
 ] 

Keith Wall commented on KAFKA-15507:


Thank you for raising [~showuon] 

When I was looking at this issue yesterday, I noted that the behaviour of the 
Admin client is at odds with the behaviour of the Producer and Consumer client. 
 With those I see:

java.lang.IllegalStateException: Cannot perform operation after producer has 
been closed

java.lang.IllegalStateException: This consumer has already been closed.

> adminClient should not throw retriable exception when closing instance
> --
>
> Key: KAFKA-15507
> URL: https://issues.apache.org/jira/browse/KAFKA-15507
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Priority: Major
>
> When adminClient is closing the instance, it'll first set 
> `hardShutdownTimeMs` to a positive timeout value, and then wait until 
> existing threads to complete within the timeout. However, within this 
> waiting, when new caller tries to invoke new commend in adminClient, it'll 
> immediately get an 
> {code:java}
> TimeoutException("The AdminClient thread is not accepting new calls.")
> {code}
> There are some issues with the design:
> 1. Since the `TimeoutException` is a retriable exception, the caller will 
> enter a tight loop and keep trying it
> 2. The error message is confusing. What does "the adminClient is not 
> accepting new calls" mean?
> We should improve it by throwing a non-retriable error (ex: 
> IllegalStateException), then, the error message should clearly describe the 
> adminClient is closing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769086#comment-17769086
 ] 

Arpit Goyal commented on KAFKA-15169:
-

Thanks [~divijvaidya] 
I have  two questions based on the code walkthrough 

{code:java}
 private  T loadIndexFile(File file, RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
Function 
fetchRemoteIndex,
Function readIndex) throws IOException 
{
File indexFile = new File(cacheDir, file.getName());
T index = null;
if (Files.exists(indexFile.toPath())) {
try {
index = readIndex.apply(indexFile);
} catch (CorruptRecordException ex) {
log.info("Error occurred while loading the stored index file 
{}", indexFile.getPath(), ex);
}
}
if (index == null) {
File tmpIndexFile = new File(indexFile.getParentFile(), 
indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
try (InputStream inputStream = 
fetchRemoteIndex.apply(remoteLogSegmentMetadata)) {
Files.copy(inputStream, tmpIndexFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
}
Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), 
indexFile.toPath(), false);
index = readIndex.apply(indexFile);
}
return index;
}
{code}


 In the RemoteIndexCache (loadIndexFile) function 
1. First we check if file exists on the disk and do a sanityCheck. I believe 
this part of code will never be executed as it occurs only when there is a 
cache miss operation. 
2. As per the first test case  it would through Corrupt record exception at the 
later  part of the code where we fetch it from remote segment and doing a 
sanityCheck 

{code:java}
 Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
index = readIndex.apply(indexFile);
{code}
I was believing the first test case was related to file already exist on the 
disk and then call getIndexEntry
1. Create a empty/corrupt file on disk
2. Call getIndexEntry 
3. It throws record corrupted action
4. In the next line it fetches from remote storage and restore the file.

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769072#comment-17769072
 ] 

Divij Vaidya commented on KAFKA-15169:
--

Separately, while you are writing tests for this cache, see if you would be 
interested to fix https://issues.apache.org/jira/browse/KAFKA-15481 as well. We 
would ideally like to write a test for the scenario mentioned in that ticket 
which fails prior to the fix and succeeds after it.

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15169) Add tests for RemoteIndexCache

2023-09-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769071#comment-17769071
 ] 

Divij Vaidya commented on KAFKA-15169:
--

Hey Arpit

Asserting the sanity of the index (or any files on disk) is an expensive 
operation. Hence, we have to strike a balance on when do we assert sanity vs. 
trust that the file is not corrupted on disk.

For logs, we perform CRC checksum while storing data on disk and after that the 
assumption is that files on disk will not get corrupted, i.e. we consider 
transfer over the network a possible culprit for corruption but don't consider 
that a file sitting on disk will get corrupted. Extending the same analogy to 
this cache, when we fetch the index files from remote store, they may be 
corrupted, so we perform a sanity check, but once stored on disk, we assume 
that files will not be corrupted.

The case you mention assumes that file sitting on disk may get corrupted but 
that is a risk we choose to accept in Kafka, given the tradeoff mentioned 
above. Hence, the case you mentioned is an acceptable risk by design.

> Add tests for RemoteIndexCache
> --
>
> Key: KAFKA-15169
> URL: https://issues.apache.org/jira/browse/KAFKA-15169
> Project: Kafka
>  Issue Type: Test
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> Follow-up from 
> https://github.com/apache/kafka/pull/13275#discussion_r1257490978



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1

2023-09-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15487.
--
Resolution: Fixed

> CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 
> 12.0.1
> --
>
> Key: KAFKA-15487
> URL: https://issues.apache.org/jira/browse/KAFKA-15487
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1
>Reporter: Rafael Rios Saavedra
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version 
> {*}9.4.51{*}. For more information see 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] 
> Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address 
> this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1

2023-09-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769066#comment-17769066
 ] 

Divij Vaidya commented on KAFKA-15487:
--

We have backported this to all community supported versions (3.4, 3.5, 3.6) as 
per EOL policy at 
[https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy?]
We have an upcoming 3.6.0 release which will contain this upgrade but we don't 
have 3.5.2 or 3.4.2 planned as of yet.

If you have thoughts on Apache Kafka's EOL policy, please participate in the 
discussion at 
[https://lists.apache.org/thread/tzx4zkhfz26joq5ydq70bxcfr3zwy1hk] 

> CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 
> 12.0.1
> --
>
> Key: KAFKA-15487
> URL: https://issues.apache.org/jira/browse/KAFKA-15487
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1
>Reporter: Rafael Rios Saavedra
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version 
> {*}9.4.51{*}. For more information see 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] 
> Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address 
> this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru merged pull request #14403: KAFKA-10199: Add missing catch for lock exception

2023-09-26 Thread via GitHub


lucasbru merged PR #14403:
URL: https://github.com/apache/kafka/pull/14403


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1

2023-09-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15487:
-
Fix Version/s: 3.4.2

> CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 
> 12.0.1
> --
>
> Key: KAFKA-15487
> URL: https://issues.apache.org/jira/browse/KAFKA-15487
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1
>Reporter: Rafael Rios Saavedra
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version 
> {*}9.4.51{*}. For more information see 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] 
> Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address 
> this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on pull request #14403: KAFKA-10199: Add missing catch for lock exception

2023-09-26 Thread via GitHub


lucasbru commented on PR #14403:
URL: https://github.com/apache/kafka/pull/14403#issuecomment-1735115556

   build failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14438: KAFKA-15487: Upgrade Jetty to 9.4.52.v20230823

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14438:
URL: https://github.com/apache/kafka/pull/14438#issuecomment-1735115448

   Since this PR addresses a CVE, I have backported this fix to all community 
supported version branches i.e. 3.4 and 3.5 (in addition to 3.6 backported by 
Satish)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru merged pull request #14436: MINOR: Revert log level changes in LogCaptureAppender

2023-09-26 Thread via GitHub


lucasbru merged PR #14436:
URL: https://github.com/apache/kafka/pull/14436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #14436: MINOR: Revert log level changes in LogCaptureAppender

2023-09-26 Thread via GitHub


lucasbru commented on PR #14436:
URL: https://github.com/apache/kafka/pull/14436#issuecomment-1735100693

   Build failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15487) CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 12.0.1

2023-09-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15487:
-
Fix Version/s: 3.6.0
   3.5.2
   (was: 3.0.0)
   (was: 2.8.0)
   (was: 2.7.1)
   (was: 2.6.2)

> CVE-2023-40167, CVE-2023-36479 - Upgrade jetty to 9.4.52, 10.0.16, 11.0.16, 
> 12.0.1
> --
>
> Key: KAFKA-15487
> URL: https://issues.apache.org/jira/browse/KAFKA-15487
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 3.4.1, 3.6.0, 3.5.1
>Reporter: Rafael Rios Saavedra
>Assignee: Divij Vaidya
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.6.0, 3.5.2
>
>
> CVE-2023-40167 and CVE-2023-36479 vulnerabilities affects Jetty version 
> {*}9.4.51{*}. For more information see 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-40167] 
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-364749] 
> Upgrading to Jetty version *9.4.52, 10.0.16, 11.0.16, 12.0.1* should address 
> this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15504) Upgrade snappy java to version 1.1.10.4

2023-09-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15504.
--
Resolution: Duplicate

Closing as duplicate of prior ticket 
https://issues.apache.org/jira/browse/KAFKA-15498. 

> Upgrade snappy java to version 1.1.10.4
> ---
>
> Key: KAFKA-15504
> URL: https://issues.apache.org/jira/browse/KAFKA-15504
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Major
>
> The version 1.1.10.4 contains a fix of 
> [CVE-2023-43642|https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]
>  as mentioned on the release notes of the library 
> [https://github.com/xerial/snappy-java/releases/tag/v1.1.10.4]  Fixed 
> SnappyInputStream so as not to allocate too large memory when decompressing 
> data with an extremely large chunk size by
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya closed pull request #14450: KAFKA-15504: Upgrade snappy java to version 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya closed pull request #14450: KAFKA-15504: Upgrade snappy java to 
version 1.1.10.4
URL: https://github.com/apache/kafka/pull/14450


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14450: KAFKA-15504: Upgrade snappy java to version 1.1.10.4

2023-09-26 Thread via GitHub


divijvaidya commented on PR #14450:
URL: https://github.com/apache/kafka/pull/14450#issuecomment-1735075163

   Hey @bmscomp 
   
   We already have a prior PR and JIRA 
(https://issues.apache.org/jira/browse/KAFKA-15498) for this change 
https://github.com/apache/kafka/pull/14434
   
   I am going to close this PR as duplicate. Please feel free to provide review 
on the other PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #14451: MINOR: Replace Java 20 with Java 21 in `README.md`

2023-09-26 Thread via GitHub


divijvaidya merged PR #14451:
URL: https://github.com/apache/kafka/pull/14451


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15507) adminClient should not throw retriable exception when closing instance

2023-09-26 Thread Luke Chen (Jira)
Luke Chen created KAFKA-15507:
-

 Summary: adminClient should not throw retriable exception when 
closing instance
 Key: KAFKA-15507
 URL: https://issues.apache.org/jira/browse/KAFKA-15507
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 3.5.1
Reporter: Luke Chen


When adminClient is closing the instance, it'll first set `hardShutdownTimeMs` 
to a positive timeout value, and then wait until existing threads to complete 
within the timeout. However, within this waiting, when new caller tries to 
invoke new commend in adminClient, it'll immediately get an 


{code:java}
TimeoutException("The AdminClient thread is not accepting new calls.")
{code}

There are some issues with the design:
1. Since the `TimeoutException` is a retriable exception, the caller will enter 
a tight loop and keep trying it
2. The error message is confusing. What does "the adminClient is not accepting 
new calls" mean?

We should improve it by throwing a non-retriable error (ex: 
IllegalStateException), then, the error message should clearly describe the 
adminClient is closing.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lucasbru commented on a diff in pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks

2023-09-26 Thread via GitHub


lucasbru commented on code in PR #14437:
URL: https://github.com/apache/kafka/pull/14437#discussion_r1336772774


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws 
Exception {
 verify(changelogReader, times(2)).enforceRestoreActive();
 }
 
+@Test
+public void shouldAwaitWhenAllTasksPaused() throws Exception {
+final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+stateUpdater.start();
+stateUpdater.add(task);
+
+when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+verifyPausedTasks(task);
+
+reset(changelogReader);
+Thread.sleep(100);
+verify(changelogReader, never()).restore(any());

Review Comment:
   I like the idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #14437: KAFKA-10199: Fix restoration behavior for paused tasks

2023-09-26 Thread via GitHub


lucasbru commented on code in PR #14437:
URL: https://github.com/apache/kafka/pull/14437#discussion_r1336770650


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -1043,6 +1046,28 @@ public void shouldResumeActiveStatefulTask() throws 
Exception {
 verify(changelogReader, times(2)).enforceRestoreActive();
 }
 
+@Test
+public void shouldAwaitWhenAllTasksPaused() throws Exception {
+final StreamTask task = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
+stateUpdater.start();
+stateUpdater.add(task);
+
+when(topologyMetadata.isPaused(null)).thenReturn(true);
+
+verifyPausedTasks(task);
+
+reset(changelogReader);

Review Comment:
   I don't think this is quite true. Paused tasks will not be added to 
`updatingTasks`, but we may still go around the loop calling `restore` on a 
potentially empty set. This is testing that after a while, we do not go around 
the loop anymore. But it's actually not a good test, for the reason described 
above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15506) follower receive KafkaStorageException before leader raise disk error

2023-09-26 Thread wangliucheng (Jira)
wangliucheng created KAFKA-15506:


 Summary: follower receive KafkaStorageException before leader 
raise disk error 
 Key: KAFKA-15506
 URL: https://issues.apache.org/jira/browse/KAFKA-15506
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
 Environment: Kafka Version: 3.3.2
Jdk Version: jdk1.8.0_301
Deployment mode: kraft 
Reporter: wangliucheng


In my kafka environment, topic has 2 replicas, leader and follower unavailable 
when disk error of leader 
The follower detects disk error before the leader
Here is the logs:

*follower recive KafkaStorageException:*
{code:java}
[2023-08-17 08:40:15,516] ERROR [ReplicaFetcher replicaId=4, leaderId=1, 
fetcherId=10] Error for partition __consumer_offsets-37 at offset 305860652 
(kafka.server.ReplicaFetcherThread)
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to 
access log file on the disk.
 {code}
*isr shrink 4,1 to 1:*
{code:java}
[2023-08-17 08:41:49,953] INFO [Partition __consumer_offsets-37 broker=1] 
Shrinking ISR from 4,1 to 1. Leader: (highWatermark: 305860652, endOffset: 
305860653). Out of sync replicas: (brokerId: 4, endOffset: 305860652). 
(kafka.cluster.Partition)
 {code}
*broker marking dir to offline:*
{code:java}
[2023-08-17 08:41:50,188] ERROR Error while appending records to 
eb_raw_legendsec_flow_2-33 in dir /data09/kafka/log 
(kafka.server.LogDirFailureChannel)
java.io.IOException: Read-only file system
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
        at 
org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92)
        at 
org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188)
        at kafka.log.LogSegment.append(LogSegment.scala:158)
        at kafka.log.LocalLog.append(LocalLog.scala:436)
        at kafka.log.UnifiedLog.append(UnifiedLog.scala:949)
        at kafka.log.UnifiedLog.appendAsFollower(UnifiedLog.scala:778)
        at 
kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1121)
        at 
kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1128)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:121)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:336)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:325)
        at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:324)
        at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
        at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
        at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:324)
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
        at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
        at scala.Option.foreach(Option.scala:437)
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
        at 
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >