Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934159003


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -572,23 +575,29 @@ public CompletableFuture acknowledgeOnClose(final 
Map 
acknowledgementsMapForNode = new HashMap<>();
-for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-Acknowledgements acknowledgements = 
acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());
 
-Acknowledgements acksFromShareFetch = 
fetchAcknowledgementsToSend.remove(tip);
+acknowledgementsMap.forEach((tip, nodeAcks) -> {
+Acknowledgements acknowledgements = 
Acknowledgements.empty();
+Map 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+if (nodeAcksFromFetchMap != null) {
+Acknowledgements acksFromFetchMap = 
nodeAcksFromFetchMap.get(tip);

Review Comment:
   I'm not sure it matters because this is on the close path, but I think 
you're right that it's an improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934149229


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+Acknowledgements acknowledgementsToSend = null;
+Map 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
+if (nodeAcknowledgementsMap != null) {
+acknowledgementsToSend = 
nodeAcknowledgementsMap.remove(tip);
+if (acknowledgementsToSend != null) {
+
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgementsToSend);
+}
 }
+
 handler.addPartitionToFetch(tip, acknowledgementsToSend);
-fetchedPartitions.add(tip);
 topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), 
tip.partition()), tip.topic());
 
 log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
 }
 }
 
-// Map storing the list of partitions to forget in the upcoming 
request.
-Map> partitionsToForgetMap = new 
HashMap<>();
+
+// Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
+// which are no longer part of the current subscription, or whose 
records were fetched from a
+// previous leader.
 Cluster cluster = metadata.fetch();
-// Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
-// which are no longer part of the current subscription.
 sessionHandlers.forEach((nodeId, sessionHandler) -> {
 Node node = cluster.nodeById(nodeId);
 if (node != null) {
 if (nodesWithPendingRequests.contains(node.id())) {
-log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", nodeId);
 } else {
-for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-if (!fetchedPartitions.contains(tip)) {
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
-
-sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
-handlerMap.put(node, sessionHandler);
-
-partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
-partitionsToForgetMap.get(node).add(tip);
-
-topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-fetchedPartitions.add(tip);
-log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
-}
-}
+Map 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+if (nodeAcksFromFetchMap != null) {
+nodeAcksFromFetchMap.forEach((tip, acks) -> {
+
metricsManager.recordAcknowledgementSent(acks.size());

Review Comment:
   Yes, good point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934133395


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -471,17 +471,18 @@ public CompletableFuture> commitSync(
 // Add the incoming commitSync() request to the queue.
 Map 
acknowledgementsMapForNode = new HashMap<>();
 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
-if (acknowledgements != null) {
-acknowledgementsMapForNode.put(tip, acknowledgements);
+NodeAcknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+if ((acknowledgements != null) && 
(acknowledgements.nodeId() == node.id())) {

Review Comment:
   I will rename the variable `acknowledgements` to `nodeAcknowledgements` to 
make it a bit clearer to read. I don't believe that 
`nodeAcknowledgements.acknowledgements()` can be null because the empty case is 
`Acknowledgements.empty()` and the fetching code in the application thread only 
sends the acks to be processed if they are non-empty. I can put a null check in 
the constructor for `NodeAcknowledgements`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


ShivsundarR commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934011920


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -471,17 +471,18 @@ public CompletableFuture> commitSync(
 // Add the incoming commitSync() request to the queue.
 Map 
acknowledgementsMapForNode = new HashMap<>();
 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
-if (acknowledgements != null) {
-acknowledgementsMapForNode.put(tip, acknowledgements);
+NodeAcknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
+if ((acknowledgements != null) && 
(acknowledgements.nodeId() == node.id())) {

Review Comment:
   Here, should we add a null check for the actual `Acknowledgements` class 
inside NodeAcknowledgements.
   Although ideally as we already have a null check for NodeAcknowledgements, 
the case shouldn't ever arise. Just for it be consistent with other parts of 
the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


ShivsundarR commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934088333


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -572,23 +575,29 @@ public CompletableFuture acknowledgeOnClose(final 
Map 
acknowledgementsMapForNode = new HashMap<>();
-for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-Acknowledgements acknowledgements = 
acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty());
 
-Acknowledgements acksFromShareFetch = 
fetchAcknowledgementsToSend.remove(tip);
+acknowledgementsMap.forEach((tip, nodeAcks) -> {
+Acknowledgements acknowledgements = 
Acknowledgements.empty();
+Map 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+if (nodeAcksFromFetchMap != null) {
+Acknowledgements acksFromFetchMap = 
nodeAcksFromFetchMap.get(tip);

Review Comment:
   Should we do a `nodeAcksFromFetchMap.remove(tip)` here? Although here as we 
are closing, we would not be using `fetchAcknowledgementsToSend` again ideally. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


ShivsundarR commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934073652


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+Acknowledgements acknowledgementsToSend = null;
+Map 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
+if (nodeAcknowledgementsMap != null) {
+acknowledgementsToSend = 
nodeAcknowledgementsMap.remove(tip);
+if (acknowledgementsToSend != null) {
+
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgementsToSend);
+}
 }
+
 handler.addPartitionToFetch(tip, acknowledgementsToSend);
-fetchedPartitions.add(tip);
 topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), 
tip.partition()), tip.topic());
 
 log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
 }
 }
 
-// Map storing the list of partitions to forget in the upcoming 
request.
-Map> partitionsToForgetMap = new 
HashMap<>();
+
+// Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
+// which are no longer part of the current subscription, or whose 
records were fetched from a
+// previous leader.
 Cluster cluster = metadata.fetch();
-// Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
-// which are no longer part of the current subscription.
 sessionHandlers.forEach((nodeId, sessionHandler) -> {
 Node node = cluster.nodeById(nodeId);
 if (node != null) {
 if (nodesWithPendingRequests.contains(node.id())) {
-log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", nodeId);
 } else {
-for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-if (!fetchedPartitions.contains(tip)) {
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
-
-sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
-handlerMap.put(node, sessionHandler);
-
-partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
-partitionsToForgetMap.get(node).add(tip);
-
-topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-fetchedPartitions.add(tip);
-log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
-}
-}
+Map 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+if (nodeAcksFromFetchMap != null) {
+nodeAcksFromFetchMap.forEach((tip, acks) -> {
+
metricsManager.recordAcknowledgementSent(acks.size());

Review Comment:
   Should we remove the mapping (tip, acks) from 
`nodeAcksFromFetchMap`(effectively removing them from 
`fetchAcknowledgementsToSend`) so that we do not process these acknowledgements 
again in the next iteration.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(no

Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933892015


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+Acknowledgements acknowledgementsToSend = null;
+Map 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());

Review Comment:
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933889523


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+Acknowledgements acknowledgementsToSend = null;
+Map 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());

Review Comment:
   I think there will need to be a follow-on PR for this. In a way, you're 
correct, but I think to maintain the ordering of things, we need to fail some 
acknowledgements more aggressively from the poll loop. I had been avoiding 
that, but I think it's inevitable actually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933861531


##
clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java:
##
@@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
  *
  * @param exception The exception thrown during processing of the request, 
or null if the acknowledgement completed successfully.
  * 
+ *  {@link AuthorizationException} if not authorized to the topic or 
group
  *  {@link InvalidRecordStateException} if the record state is invalid
- *  {@link AuthorizationException} if not authorized to the topic of 
group
+ *  {@link NotLeaderOrFollowerException} if the leader had changed by 
the time the acknowledgements were sent
+ *  {@link DisconnectException} if the broker disconnected before the 
request could be completed
  *  {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is 
called before or while this function is called
  *  {@link InterruptException} if the calling thread is interrupted 
before or while this function is called
  *  {@link KafkaException} for any other unrecoverable errors
  * 
+ * Note that if the exception is a retriable exception, the 
acknowledgement could not be completed and the

Review Comment:
   I will reword.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933841736


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
 k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-if (acknowledgementsToSend != null) {
-
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+Acknowledgements acknowledgementsToSend = null;
+Map 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());

Review Comment:
   Query: Can there be ever some acknowledgments which can starve in the 
`fetchAcknowledgementsToSend` when some nodeId is never received from broker?



##
clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java:
##
@@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
  *
  * @param exception The exception thrown during processing of the request, 
or null if the acknowledgement completed successfully.
  * 
+ *  {@link AuthorizationException} if not authorized to the topic or 
group
  *  {@link InvalidRecordStateException} if the record state is invalid
- *  {@link AuthorizationException} if not authorized to the topic of 
group
+ *  {@link NotLeaderOrFollowerException} if the leader had changed by 
the time the acknowledgements were sent
+ *  {@link DisconnectException} if the broker disconnected before the 
request could be completed
  *  {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is 
called before or while this function is called
  *  {@link InterruptException} if the calling thread is interrupted 
before or while this function is called
  *  {@link KafkaException} for any other unrecoverable errors
  * 
+ * Note that if the exception is a retriable exception, the 
acknowledgement could not be completed and the

Review Comment:
   nit: For non-retribale exceptions it will fetch anyways again so I expect we 
mean that `even` for retriable exception, the fetch will happen again?
   ```suggestion
* Note that if the exception is even a retriable exception, the 
acknowledgement could not be completed and the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933574562


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##
@@ -115,25 +120,26 @@ public int size() {
  * @return Whether the acknowledgements were sent to the broker and a 
response received
  */
 public boolean isCompleted() {
-return acknowledgeErrorCode != null;
+return completed;
 }
 
 /**
- * Set the acknowledgement error code when the response has been received 
from the broker.
+ * Set the acknowledgement exception when the response has been received 
from the broker.
  *
- * @param acknowledgeErrorCode the error code
+ * @param acknowledgeException the exception (will be null if successful)
  */
-public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) {
-this.acknowledgeErrorCode = acknowledgeErrorCode;
+public void setAcknowledgeException(KafkaException acknowledgeException) {
+completed = true;
+this.acknowledgeException = acknowledgeException;
 }
 
 /**
- * Get the acknowledgement error code when the response has been received 
from the broker.
+ * Get the acknowledgement exception when the response has been received 
from the broker.
  *
  * @return the error code
  */
-public Errors getAcknowledgeErrorCode() {
-return acknowledgeErrorCode;
+public KafkaException getAcknowledgeException() {

Review Comment:
   In public classes, yes, I agree. However, here there are many accessor 
methods already present and I don't think renaming them all improves things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933558890


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {

Review Comment:
   Yeah you are right, client is not yet there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


AndrewJSchofield commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933556619


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {

Review Comment:
   Actually, I don't think that's allowed in the client. Records were 
introduced in Java 14 and can be used in the broker which builds with Java 17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-29 Thread via GitHub


apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933528377


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##
@@ -35,15 +36,19 @@ public class Acknowledgements {
 // The acknowledgements keyed by offset. If the record is a gap, the 
AcknowledgeType will be null.
 private final Map acknowledgements;
 
-// When the broker responds to the acknowledgements, this is the error 
code returned.
-private Errors acknowledgeErrorCode;
+// When the broker responds to the acknowledgements, this is the exception 
thrown.
+private KafkaException acknowledgeException;
+
+private boolean completed;

Review Comment:
   Should we have comments on variable as like other in the class?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {
+private final int nodeId;
+private final Acknowledgements acknowledgements;
+
+public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) 
{
+this.nodeId = nodeId;
+this.acknowledgements = acknowledgements;
+}
+
+public int getNodeId() {

Review Comment:
   ```suggestion
   public int nodeId() {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * This class combines Acknowledgements with the id of the node to use for 
acknowledging.
+ */
+public class NodeAcknowledgements {

Review Comment:
   nit: Seems this can be a `record` class itself.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##
@@ -301,10 +307,12 @@ private boolean 
canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl
 public String toString() {
 StringBuilder sb = new StringBuilder("Acknowledgements(");
 sb.append(acknowledgements);
-if (acknowledgeErrorCode != null) {
-sb.append(", errorCode=");
-sb.append(acknowledgeErrorCode.code());
+if (acknowledgeException != null) {
+sb.append(", acknowledgeException=");
+sb.append(Errors.forException(acknowledgeException));
 }

Review Comment:
   Should it be without if check, so explicitly errorCode can be logged?
   
   ```
   sb.append(", errorCode=")
   sb.append(acknowledgeException != null : 
Errors.forException(acknowledgeException) ? "null")



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java:
##
@@ -115,25 +120,26 @@ public int size() {
  * @return Whether the acknowledgements were sent to the broker and a 
response received
  */
 public boolean isCompleted() {
-return acknowledgeErrorCode != null;
+return completed;
 }
 
 /**
- * Set the acknowledgement error code when the response has been received 
from the broker.
+ * Set the acknowledgement exception when the response has been received 

Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-23 Thread via GitHub


clolov commented on PR #18672:
URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609968503

   Awesome! Looking forward to seeing those 😊 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-23 Thread via GitHub


AndrewJSchofield commented on PR #18672:
URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609755534

   > I know this is still marked as a Draft, but how difficult will it be to 
add a test which tests the edge case which uncovered this? I believe I can 
follow the reasoning otherwise i.e. you only want to send the acknowledgements 
to the nodes from which the in-flight records originated.
   
   Not that difficult. Might be a follow-on PR to complete the testing, but we 
have two pieces of work in progress. First, we are developing tests using 
`MockClient` which can pretend to be a cluster of multiple brokers in which we 
can very deliberately introduce specific changes of leadership to exercise all 
of the code paths. Then, we are starting to run system tests which roll the 
cluster while performing a long-running workload. It was a rolling restart 
system test that showed the problem initially.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]

2025-01-23 Thread via GitHub


clolov commented on PR #18672:
URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609616541

   I know this is still marked as a Draft, but how difficult will it be to add 
a test which tests the edge case which uncovered this? Otherwise, I believe I 
can follow the reasoning i.e. you only want to send the acknowledgements to the 
nodes from which the in-flight records originated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org