AndrewJSchofield commented on code in PR #18939:
URL: https://github.com/apache/kafka/pull/18939#discussion_r1965446791


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -771,6 +777,15 @@ private void handleShareFetchSuccess(Node fetchTarget,
                 }
             }
 
+            // Handle any acknowledgements which were not received in the 
response.
+            fetchAcknowledgementsInFlight.forEach((integer, 
topicIdPartitionAcknowledgementsMap) -> {

Review Comment:
   nit: `integer` is a terrible variable name :) `nodeId` ?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1159,8 +1226,8 @@ void handleAcknowledgeTimedOut(TopicIdPartition tip) {
             Acknowledgements acks = incompleteAcknowledgements.get(tip);
             if (acks != null) {
                 acks.complete(Errors.REQUEST_TIMED_OUT.exception());
+                resultHandler.complete(tip, acks, requestType);
             }
-            resultHandler.complete(tip, acks, onCommitAsync());

Review Comment:
   Please can you rename `onCommitAsync()` to `isCommitAsyncRequest()`, and 
`onClose()` to `isCloseRequest()`. The pattern `on....` sounds like a method 
which responds to an event. This naming had been bothering me and I've figured 
out why.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -771,6 +777,15 @@ private void handleShareFetchSuccess(Node fetchTarget,
                 }
             }
 
+            // Handle any acknowledgements which were not received in the 
response.

Review Comment:
   This seems like a problem to me. Really, you should only be looking at 
`fetchAcknowledgementsInFlight.get(fetchTarget.id())`, namely the fetch 
acknowledgements in fetch for the node from which we received the response.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -964,6 +952,83 @@ private void handleShareAcknowledgeFailure(Node 
fetchTarget,
         }
     }
 
+    private void 
handlePartitionError(ShareAcknowledgeResponseData.PartitionData partitionData,
+                                      Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                      AcknowledgeRequestState 
acknowledgeRequestState,
+                                      Errors partitionError,
+                                      TopicIdPartition tip,
+                                      AtomicBoolean shouldRetry) {
+        if (partitionError.exception() != null) {
+            handleErrorConditionally(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip, 
shouldRetry);
+        } else {
+            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+        }
+    }
+
+    private void processRetryLogic(AcknowledgeRequestState 
acknowledgeRequestState,
+                                   AtomicBoolean shouldRetry,
+                                   long responseCompletionTimeMs) {
+        if (shouldRetry.get()) {
+            acknowledgeRequestState.onFailedAttempt(responseCompletionTimeMs);
+
+            // Check for any acknowledgements that did not receive a response.
+            // These acknowledgements are failed with 
InvalidRecordStateException.
+            acknowledgeRequestState.processPendingInFlightAcknowledgements(new 
InvalidRecordStateException(INVALID_RESPONSE));
+        } else {
+            
acknowledgeRequestState.onSuccessfulAttempt(responseCompletionTimeMs);
+            acknowledgeRequestState.processingComplete();
+        }
+    }
+
+    private void 
handleErrorConditionally(ShareAcknowledgeResponseData.PartitionData 
partitionData,
+                                          Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                          AcknowledgeRequestState 
acknowledgeRequestState,
+                                          Errors partitionError,
+                                          TopicIdPartition tip,
+                                          AtomicBoolean shouldRetry) {
+        boolean retry = false;
+        if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || partitionError 
== Errors.FENCED_LEADER_EPOCH) {
+            // If the leader has changed, there's no point in retrying the 
operation because the acquisition locks
+            // will have been released.
+            updateLeaderInfoMap(partitionData, 
partitionsWithUpdatedLeaderInfo, partitionError, tip.topicPartition());
+        } else if (partitionError.exception() instanceof RetriableException) {
+            retry = true;
+        }
+
+        if (retry) {
+            if (acknowledgeRequestState.moveToIncompleteAcks(tip)) {
+                shouldRetry.set(true);
+            }
+        } else {
+            
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
+            acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
partitionError);
+        }
+    }
+
+    private void 
updateLeaderInfoMap(ShareAcknowledgeResponseData.PartitionData partitionData,
+                                  Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                  Errors partitionError,
+                                  TopicPartition tp) {
+
+        log.debug("For {}, received error {}, with leaderIdAndEpoch {} in 
ShareAcknowledge", tp, partitionError, partitionData.currentLeader());
+        if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
+            partitionsWithUpdatedLeaderInfo.put(tp, new 
Metadata.LeaderIdAndEpoch(
+                    Optional.of(partitionData.currentLeader().leaderId()),
+                    Optional.of(partitionData.currentLeader().leaderEpoch())
+            ));
+        }
+    }
+
+    private TopicIdPartition createTopicIdPartition(Uuid topicId, int 
partitionIndex) {

Review Comment:
   I suggest `lookupTopicId`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1187,11 +1254,22 @@ ShareSessionHandler sessionHandler() {
         }
 
         void processingComplete() {
-            inFlightAcknowledgements.clear();
+            // If there are any pending inFlightAcknowledgements after 
processing the response, we fail them with an InvalidRecordStateException.
+            processPendingInFlightAcknowledgements(new 
InvalidRecordStateException(INVALID_RESPONSE));
             resultHandler.completeIfEmpty();
             isProcessed = true;
         }
 
+        private void processPendingInFlightAcknowledgements(KafkaException 
exception) {
+            if (!inFlightAcknowledgements.isEmpty()) {
+                inFlightAcknowledgements.forEach((partition, acknowledgements) 
-> {
+                    acknowledgements.complete(exception);
+                    resultHandler.complete(partition, acknowledgements, 
requestType);
+                });
+            }
+            inFlightAcknowledgements.clear();

Review Comment:
   nit: You only need to clear the `inFlightAcknowledgements` if it's non-empty.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1209,10 +1287,14 @@ boolean maybeExpire() {
          * Moves the in-flight acknowledgements for a given partition to 
incomplete acknowledgements to retry
          * in the next request.
          */

Review Comment:
   Javadoc for the return value please. It has become important.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -964,6 +952,83 @@ private void handleShareAcknowledgeFailure(Node 
fetchTarget,
         }
     }
 
+    private void 
handlePartitionError(ShareAcknowledgeResponseData.PartitionData partitionData,
+                                      Map<TopicPartition, 
Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo,
+                                      AcknowledgeRequestState 
acknowledgeRequestState,
+                                      Errors partitionError,
+                                      TopicIdPartition tip,
+                                      AtomicBoolean shouldRetry) {
+        if (partitionError.exception() != null) {
+            handleErrorConditionally(partitionData, 
partitionsWithUpdatedLeaderInfo, acknowledgeRequestState, partitionError, tip, 
shouldRetry);

Review Comment:
   I would in-line this method's body again I think. It's clearer to follow 
without another method call.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -856,59 +876,24 @@ private void handleShareAcknowledgeSuccess(Node 
fetchTarget,
                         // We retry the request until the timer expires, 
unless we are closing.
                         acknowledgeRequestState.moveAllToIncompleteAcks();
                     } else {
-                        
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {
-                            TopicIdPartition tip = new 
TopicIdPartition(shareAcknowledgeTopicResponse.topicId(),
-                                    partitionData.partitionIndex(),
-                                    
metadata.topicNames().get(shareAcknowledgeTopicResponse.topicId()));
-
-                            
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
-                        }));
+                        
acknowledgeRequestState.processPendingInFlightAcknowledgements(response.error().exception());
                         acknowledgeRequestState.processingComplete();
                     }
                 } else {
                     AtomicBoolean shouldRetry = new AtomicBoolean(false);
                     // Check all partition level error codes
                     
response.data().responses().forEach(shareAcknowledgeTopicResponse -> 
shareAcknowledgeTopicResponse.partitions().forEach(partitionData -> {

Review Comment:
   I would shorten `shareAcknowledgeTopicResponse` to `topicResponse` to match 
the new code added near the top of this method.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -716,11 +719,14 @@ private void handleShareFetchSuccess(Node fetchTarget,
             final Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
responseData = new LinkedHashMap<>();
 
             response.data().responses().forEach(topicResponse ->
-                    topicResponse.partitions().forEach(partition ->
-                            responseData.put(new 
TopicIdPartition(topicResponse.topicId(),
-                                    partition.partitionIndex(),
-                                    
metadata.topicNames().getOrDefault(topicResponse.topicId(),
-                                            topicNamesMap.remove(new 
IdAndPartition(topicResponse.topicId(), partition.partitionIndex())))), 
partition))
+                    topicResponse.partitions().forEach(partition -> {
+                        TopicIdPartition tip = 
createTopicIdPartition(topicResponse.topicId(), partition.partitionIndex());
+                        if (tip != null) {
+                            responseData.put(tip, partition);
+                        } else {
+                            log.error("Topic name not found for topic ID {}", 
topicResponse.topicId());

Review Comment:
   In this case, you'll log two error lines because `createTopicIdPartition` 
write a log line. I suggest choosing whether you want to log in the caller or 
in the method and using the same pattern for all instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to