Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934159003 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -572,23 +575,29 @@ public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMapForNode = new HashMap<>(); -for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -Acknowledgements acknowledgements = acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty()); -Acknowledgements acksFromShareFetch = fetchAcknowledgementsToSend.remove(tip); +acknowledgementsMap.forEach((tip, nodeAcks) -> { +Acknowledgements acknowledgements = Acknowledgements.empty(); +Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); +if (nodeAcksFromFetchMap != null) { +Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.get(tip); Review Comment: I'm not sure it matters because this is on the close path, but I think you're right that it's an improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934149229 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); +Acknowledgements acknowledgementsToSend = null; +Map nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); +if (nodeAcknowledgementsMap != null) { +acknowledgementsToSend = nodeAcknowledgementsMap.remove(tip); +if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); +} } + handler.addPartitionToFetch(tip, acknowledgementsToSend); -fetchedPartitions.add(tip); topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } -// Map storing the list of partitions to forget in the upcoming request. -Map> partitionsToForgetMap = new HashMap<>(); + +// Iterate over the session handlers to see if there are acknowledgements to be sent for partitions +// which are no longer part of the current subscription, or whose records were fetched from a +// previous leader. Cluster cluster = metadata.fetch(); -// Iterating over the session handlers to see if there are acknowledgements to be sent for partitions -// which are no longer part of the current subscription. sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { if (nodesWithPendingRequests.contains(node.id())) { -log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id()); +log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); } else { -for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -if (!fetchedPartitions.contains(tip)) { -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); - -sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); -handlerMap.put(node, sessionHandler); - -partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); -partitionsToForgetMap.get(node).add(tip); - -topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); -fetchedPartitions.add(tip); -log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); -} -} +Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); +if (nodeAcksFromFetchMap != null) { +nodeAcksFromFetchMap.forEach((tip, acks) -> { + metricsManager.recordAcknowledgementSent(acks.size()); Review Comment: Yes, good point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934133395 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -471,17 +471,18 @@ public CompletableFuture> commitSync( // Add the incoming commitSync() request to the queue. Map acknowledgementsMapForNode = new HashMap<>(); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -Acknowledgements acknowledgements = acknowledgementsMap.get(tip); -if (acknowledgements != null) { -acknowledgementsMapForNode.put(tip, acknowledgements); +NodeAcknowledgements acknowledgements = acknowledgementsMap.get(tip); +if ((acknowledgements != null) && (acknowledgements.nodeId() == node.id())) { Review Comment: I will rename the variable `acknowledgements` to `nodeAcknowledgements` to make it a bit clearer to read. I don't believe that `nodeAcknowledgements.acknowledgements()` can be null because the empty case is `Acknowledgements.empty()` and the fetching code in the application thread only sends the acks to be processed if they are non-empty. I can put a null check in the constructor for `NodeAcknowledgements`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
ShivsundarR commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934011920 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -471,17 +471,18 @@ public CompletableFuture> commitSync( // Add the incoming commitSync() request to the queue. Map acknowledgementsMapForNode = new HashMap<>(); for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -Acknowledgements acknowledgements = acknowledgementsMap.get(tip); -if (acknowledgements != null) { -acknowledgementsMapForNode.put(tip, acknowledgements); +NodeAcknowledgements acknowledgements = acknowledgementsMap.get(tip); +if ((acknowledgements != null) && (acknowledgements.nodeId() == node.id())) { Review Comment: Here, should we add a null check for the actual `Acknowledgements` class inside NodeAcknowledgements. Although ideally as we already have a null check for NodeAcknowledgements, the case shouldn't ever arise. Just for it be consistent with other parts of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
ShivsundarR commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934088333 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -572,23 +575,29 @@ public CompletableFuture acknowledgeOnClose(final Map acknowledgementsMapForNode = new HashMap<>(); -for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -Acknowledgements acknowledgements = acknowledgementsMap.getOrDefault(tip, Acknowledgements.empty()); -Acknowledgements acksFromShareFetch = fetchAcknowledgementsToSend.remove(tip); +acknowledgementsMap.forEach((tip, nodeAcks) -> { +Acknowledgements acknowledgements = Acknowledgements.empty(); +Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); +if (nodeAcksFromFetchMap != null) { +Acknowledgements acksFromFetchMap = nodeAcksFromFetchMap.get(tip); Review Comment: Should we do a `nodeAcksFromFetchMap.remove(tip)` here? Although here as we are closing, we would not be using `fetchAcknowledgementsToSend` again ideally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
ShivsundarR commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934073652 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); +Acknowledgements acknowledgementsToSend = null; +Map nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); +if (nodeAcknowledgementsMap != null) { +acknowledgementsToSend = nodeAcknowledgementsMap.remove(tip); +if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); +} } + handler.addPartitionToFetch(tip, acknowledgementsToSend); -fetchedPartitions.add(tip); topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } -// Map storing the list of partitions to forget in the upcoming request. -Map> partitionsToForgetMap = new HashMap<>(); + +// Iterate over the session handlers to see if there are acknowledgements to be sent for partitions +// which are no longer part of the current subscription, or whose records were fetched from a +// previous leader. Cluster cluster = metadata.fetch(); -// Iterating over the session handlers to see if there are acknowledgements to be sent for partitions -// which are no longer part of the current subscription. sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { if (nodesWithPendingRequests.contains(node.id())) { -log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id()); +log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); } else { -for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { -if (!fetchedPartitions.contains(tip)) { -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); - -sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); -handlerMap.put(node, sessionHandler); - -partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); -partitionsToForgetMap.get(node).add(tip); - -topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); -fetchedPartitions.add(tip); -log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); -} -} +Map nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); +if (nodeAcksFromFetchMap != null) { +nodeAcksFromFetchMap.forEach((tip, acks) -> { + metricsManager.recordAcknowledgementSent(acks.size()); Review Comment: Should we remove the mapping (tip, acks) from `nodeAcksFromFetchMap`(effectively removing them from `fetchAcknowledgementsToSend`) so that we do not process these acknowledgements again in the next iteration. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(no
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933892015 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); +Acknowledgements acknowledgementsToSend = null; +Map nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); Review Comment: Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933889523 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); +Acknowledgements acknowledgementsToSend = null; +Map nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); Review Comment: I think there will need to be a follow-on PR for this. In a way, you're correct, but I think to maintain the ordering of things, we need to fail some acknowledgements more aggressively from the poll loop. I had been avoiding that, but I think it's inevitable actually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933861531 ## clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java: ## @@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback { * * @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully. * + * {@link AuthorizationException} if not authorized to the topic or group * {@link InvalidRecordStateException} if the record state is invalid - * {@link AuthorizationException} if not authorized to the topic of group + * {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent + * {@link DisconnectException} if the broker disconnected before the request could be completed * {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called * {@link InterruptException} if the calling thread is interrupted before or while this function is called * {@link KafkaException} for any other unrecoverable errors * + * Note that if the exception is a retriable exception, the acknowledgement could not be completed and the Review Comment: I will reword. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933841736 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); -Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); -if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); -fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); +Acknowledgements acknowledgementsToSend = null; +Map nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); Review Comment: Query: Can there be ever some acknowledgments which can starve in the `fetchAcknowledgementsToSend` when some nodeId is never received from broker? ## clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java: ## @@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback { * * @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully. * + * {@link AuthorizationException} if not authorized to the topic or group * {@link InvalidRecordStateException} if the record state is invalid - * {@link AuthorizationException} if not authorized to the topic of group + * {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent + * {@link DisconnectException} if the broker disconnected before the request could be completed * {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called * {@link InterruptException} if the calling thread is interrupted before or while this function is called * {@link KafkaException} for any other unrecoverable errors * + * Note that if the exception is a retriable exception, the acknowledgement could not be completed and the Review Comment: nit: For non-retribale exceptions it will fetch anyways again so I expect we mean that `even` for retriable exception, the fetch will happen again? ```suggestion * Note that if the exception is even a retriable exception, the acknowledgement could not be completed and the ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933574562 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ## @@ -115,25 +120,26 @@ public int size() { * @return Whether the acknowledgements were sent to the broker and a response received */ public boolean isCompleted() { -return acknowledgeErrorCode != null; +return completed; } /** - * Set the acknowledgement error code when the response has been received from the broker. + * Set the acknowledgement exception when the response has been received from the broker. * - * @param acknowledgeErrorCode the error code + * @param acknowledgeException the exception (will be null if successful) */ -public void setAcknowledgeErrorCode(Errors acknowledgeErrorCode) { -this.acknowledgeErrorCode = acknowledgeErrorCode; +public void setAcknowledgeException(KafkaException acknowledgeException) { +completed = true; +this.acknowledgeException = acknowledgeException; } /** - * Get the acknowledgement error code when the response has been received from the broker. + * Get the acknowledgement exception when the response has been received from the broker. * * @return the error code */ -public Errors getAcknowledgeErrorCode() { -return acknowledgeErrorCode; +public KafkaException getAcknowledgeException() { Review Comment: In public classes, yes, I agree. However, here there are many accessor methods already present and I don't think renaming them all improves things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933558890 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { Review Comment: Yeah you are right, client is not yet there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933556619 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { Review Comment: Actually, I don't think that's allowed in the client. Records were introduced in Java 14 and can be used in the broker which builds with Java 17. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933528377 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ## @@ -35,15 +36,19 @@ public class Acknowledgements { // The acknowledgements keyed by offset. If the record is a gap, the AcknowledgeType will be null. private final Map acknowledgements; -// When the broker responds to the acknowledgements, this is the error code returned. -private Errors acknowledgeErrorCode; +// When the broker responds to the acknowledgements, this is the exception thrown. +private KafkaException acknowledgeException; + +private boolean completed; Review Comment: Should we have comments on variable as like other in the class? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { +private final int nodeId; +private final Acknowledgements acknowledgements; + +public NodeAcknowledgements(int nodeId, Acknowledgements acknowledgements) { +this.nodeId = nodeId; +this.acknowledgements = acknowledgements; +} + +public int getNodeId() { Review Comment: ```suggestion public int nodeId() { ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NodeAcknowledgements.java: ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +/** + * This class combines Acknowledgements with the id of the node to use for acknowledging. + */ +public class NodeAcknowledgements { Review Comment: nit: Seems this can be a `record` class itself. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ## @@ -301,10 +307,12 @@ private boolean canOptimiseForSingleAcknowledgeType(AcknowledgementBatch acknowl public String toString() { StringBuilder sb = new StringBuilder("Acknowledgements("); sb.append(acknowledgements); -if (acknowledgeErrorCode != null) { -sb.append(", errorCode="); -sb.append(acknowledgeErrorCode.code()); +if (acknowledgeException != null) { +sb.append(", acknowledgeException="); +sb.append(Errors.forException(acknowledgeException)); } Review Comment: Should it be without if check, so explicitly errorCode can be logged? ``` sb.append(", errorCode=") sb.append(acknowledgeException != null : Errors.forException(acknowledgeException) ? "null") ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java: ## @@ -115,25 +120,26 @@ public int size() { * @return Whether the acknowledgements were sent to the broker and a response received */ public boolean isCompleted() { -return acknowledgeErrorCode != null; +return completed; } /** - * Set the acknowledgement error code when the response has been received from the broker. + * Set the acknowledgement exception when the response has been received
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
clolov commented on PR #18672: URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609968503 Awesome! Looking forward to seeing those 😊 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
AndrewJSchofield commented on PR #18672: URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609755534 > I know this is still marked as a Draft, but how difficult will it be to add a test which tests the edge case which uncovered this? I believe I can follow the reasoning otherwise i.e. you only want to send the acknowledgements to the nodes from which the in-flight records originated. Not that difficult. Might be a follow-on PR to complete the testing, but we have two pieces of work in progress. First, we are developing tests using `MockClient` which can pretend to be a cluster of multiple brokers in which we can very deliberately introduce specific changes of leadership to exercise all of the code paths. Then, we are starting to run system tests which roll the cluster while performing a long-running workload. It was a rolling restart system test that showed the problem initially. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18618: Improve leader change handling of acknowledgements [kafka]
clolov commented on PR #18672: URL: https://github.com/apache/kafka/pull/18672#issuecomment-2609616541 I know this is still marked as a Draft, but how difficult will it be to add a test which tests the edge case which uncovered this? Otherwise, I believe I can follow the reasoning i.e. you only want to send the acknowledgements to the nodes from which the in-flight records originated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org