AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1699932061


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -876,4 +1020,34 @@ public void completeIfEmpty() {
             }
         }
     }
+
+    static class Pair<V> {
+        private V asyncRequest;
+        private V syncRequest;
+
+        public Pair(V first, V second) {

Review Comment:
   nit: The arguments to the constructor should also be `asyncRequest` and 
`syncRequest`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -876,4 +1020,34 @@ public void completeIfEmpty() {
             }
         }
     }
+
+    static class Pair<V> {
+        private V asyncRequest;
+        private V syncRequest;
+
+        public Pair(V first, V second) {
+            this.asyncRequest = first;
+            this.syncRequest = second;
+        }
+
+        public void setAsyncRequest(V asyncRequest) {
+            this.asyncRequest = asyncRequest;
+        }
+
+        public void setSecond(V second) {

Review Comment:
   `setSyncRequest`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -332,22 +378,32 @@ public void commitAsync(final Map<TopicIdPartition, 
Acknowledgements> acknowledg
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
                         log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
                         resultCount.incrementAndGet();
+                        AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+                        if (asyncRequestState == null) {
+                            acknowledgeRequestStates.replace(nodeId, new 
Pair<>(new AcknowledgeRequestState(logContext,

Review Comment:
   There's no need to replace or construct a new Pair here. Just use 
`setAsyncRequest`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,6 +242,57 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState, long currentTimeMs, boolean onCommitAsync) {

Review Comment:
   Thinking about this, I would like you to remove `isAsyncDone` entirely. If 
this method returns `Optional.empty()`, then it means that no request was 
generated. Then the caller can take an appropriate action based on its return 
value, rather than using this piece of shared state which is more fragile.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());

Review Comment:
   I don't see this update yet.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,6 +242,57 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState, long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty() && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {

Review Comment:
   This part is wrong. `isAsyncDone` should only be set to true if 
`onCommitAsync` is true AND we have not built a request.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -215,37 +215,20 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
      */
     private PollResult processAcknowledgements(long currentTimeMs) {
         List<UnsentRequest> unsentRequests = new ArrayList<>();
-        Iterator<AcknowledgeRequestState> iterator = 
acknowledgeRequestStates.iterator();
-        while (iterator.hasNext()) {
-            AcknowledgeRequestState acknowledgeRequestState = iterator.next();
-            if (acknowledgeRequestState.isProcessed()) {
-                iterator.remove();
-            } else if (!acknowledgeRequestState.maybeExpire()) {
-                if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
-                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
-                } else {
-                    if (acknowledgeRequestState.canSendRequest(currentTimeMs)) 
{
-                        acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                        UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
-                        if (request != null) {
-                            unsentRequests.add(request);
-                        }
-                    }
-                }
-            } else {
-                // Fill in TimeoutException
-                for (TopicIdPartition tip : 
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.REQUEST_TIMED_OUT);
-                }
-                iterator.remove();
-            }
+        for (Pair<AcknowledgeRequestState> requestStates : 
acknowledgeRequestStates.values()) {
+            isAsyncDone = false;
+            // For commitAsync
+            maybeBuildRequest(requestStates.getAsyncRequest(), currentTimeMs, 
true).ifPresent(unsentRequests::add);
+
+            // Check to ensure we start processing commitSync/close only if 
there are no commitAsync requests left to process.
+            if (isAsyncDone)
+                maybeBuildRequest(requestStates.getSyncRequest(), 
currentTimeMs, false).ifPresent(unsentRequests::add);
         }
 
         PollResult pollResult = null;
         if (!unsentRequests.isEmpty()) {
             pollResult = new PollResult(unsentRequests);
-        } else if (!acknowledgeRequestStates.isEmpty()) {
+        } else if (areAnyAcknowledgementsLeft()) {

Review Comment:
   That means a side-effect of this predicate check in the else-if is to prune 
any remaining acknowledge request states. That seems like a significant 
side-effect to have which only runs conditionally. I would have make this part 
of this check at all.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -382,22 +438,34 @@ public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, Ac
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeCloseSuccess,
-                        this::handleShareAcknowledgeCloseFailure,
-                        resultHandler,
-                        true
-                ));
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // Ensure there is no commitSync()/close() request already 
present as they are blocking calls
+                // and only one request can be active at a time.
+                if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != 
null) {
+                    log.error("Attempt to call close() when there is an 
existing sync request for node {}", node.id());

Review Comment:
   The result needs to be considered here. Maybe in this case it's less 
critical than for commitSync().



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -291,18 +325,27 @@ public CompletableFuture<Map<TopicIdPartition, 
Acknowledgements>> commitSync(
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeSuccess,
-                        this::handleShareAcknowledgeFailure,
-                        resultHandler
-                ));
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // Ensure there is no commitSync()/close() request already 
present as they are blocking calls
+                // and only one request can be active at a time.
+                if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != 
null) {
+                    log.error("Attempt to call commitSync() when there is an 
existing sync request for node {}", node.id());

Review Comment:
   Probably ought to have the resulting future completing exceptionally also. 
Otherwise, it will look like the commitSync() completed normally when it will 
not have been attempted and only the log line will reveal the bug.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -565,7 +640,7 @@ private void handleShareAcknowledgeFailure(Node fetchTarget,
                 TopicIdPartition tip = new TopicIdPartition(topic.topicId(),
                         partition.partitionIndex(),
                         metadata.topicNames().get(topic.topicId()));
-                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
+                
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getInFlightAcknowledgementsCount(tip));
                 acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.forException(error));
             }));
         } finally {

Review Comment:
   I think we ought to call `acknowledgeRequestState.onFailedAttempt` or 
`acknowledgeRequestState.onSuccessfulAttempt` for every single code path that 
handles a response to an AcknowledgeRequestState. I think that's not true at 
the moment but it's hard to tell because in some cases we do it in the 
ShareConsumeRequestManager methods, and in some cases we do it in the 
AcknowledgeRequestState methods. Please can you make this consistent to make it 
easier to validate.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -520,31 +588,38 @@ private void handleShareAcknowledgeSuccess(Node 
fetchTarget,
                 acknowledgeRequestState.onFailedAttempt(currentTimeMs);
                 if (response.error().exception() instanceof RetriableException 
&& !closing) {
                     // We retry the request until the timer expires, unless we 
are closing.
-                } else {
-                    requestData.topics().forEach(topic -> 
topic.partitions().forEach(partition -> {
-                        TopicIdPartition tip = new 
TopicIdPartition(topic.topicId(),
-                                partition.partitionIndex(),
-                                metadata.topicNames().get(topic.topicId()));
-                        
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-                        
acknowledgeRequestState.handleAcknowledgeErrorCode(tip, response.error());
-                    }));
-
-                    acknowledgeRequestState.processingComplete();
+                    acknowledgeRequestState.retryRequest();

Review Comment:
   Doesn't there need to be an else clause here for the situation in which we 
are not retrying? That should `handleAcknowledgeErrorCode` and set the metrics 
I think.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -291,18 +325,27 @@ public CompletableFuture<Map<TopicIdPartition, 
Acknowledgements>> commitSync(
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeSuccess,
-                        this::handleShareAcknowledgeFailure,
-                        resultHandler
-                ));
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // Ensure there is no commitSync()/close() request already 
present as they are blocking calls
+                // and only one request can be active at a time.
+                if (acknowledgeRequestStates.get(nodeId).getSyncRequest() != 
null) {
+                    log.error("Attempt to call commitSync() when there is an 
existing sync request for node {}", node.id());
+                } else {
+                    // There can only be one commitSync()/close() happening at 
a time. So per node, there will be one acknowledge request state representing 
commitSync() and close().
+                    acknowledgeRequestStates.put(nodeId, new 
Pair<>(acknowledgeRequestStates.get(nodeId).getAsyncRequest(), new 
AcknowledgeRequestState(logContext,

Review Comment:
   There's no need to construct a new Pair here. You can use `setSyncRequest()` 
instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to