AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1698163085


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;

Review Comment:
   Should this line be conditional on `onCommitAsync` like the others?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -207,46 +212,32 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
     }
 
     /**
-     * Process acknowledgeRequestStates and prepares a list of 
acknowledgements to be sent in the poll().
+     * Process acknowledgeRequestStates and prepares a list of 
acknowledgements to
+     * be sent in the poll().
      *
      * @param currentTimeMs the current time in ms.
      *
      * @return the PollResult containing zero or more acknowledgements.
      */
     private PollResult processAcknowledgements(long currentTimeMs) {
         List<UnsentRequest> unsentRequests = new ArrayList<>();
-        Iterator<AcknowledgeRequestState> iterator = 
acknowledgeRequestStates.iterator();
-        while (iterator.hasNext()) {
-            AcknowledgeRequestState acknowledgeRequestState = iterator.next();
-            if (acknowledgeRequestState.isProcessed()) {
-                iterator.remove();
-            } else if (!acknowledgeRequestState.maybeExpire()) {
-                if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
-                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
-                } else {
-                    if (acknowledgeRequestState.canSendRequest(currentTimeMs)) 
{
-                        acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                        UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
-                        if (request != null) {
-                            unsentRequests.add(request);
-                        }
-                    }
-                }
-            } else {
-                // Fill in TimeoutException
-                for (TopicIdPartition tip : 
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.REQUEST_TIMED_OUT);
-                }
-                iterator.remove();
-            }
+        for (Pair<AcknowledgeRequestState> requestStates : 
acknowledgeRequestStates.values()) {
+            isAsyncDone = false;
+            // For commitAsync
+            maybeBuildRequest(requestStates.getFirst(), currentTimeMs, 
true).ifPresent(unsentRequests::add);

Review Comment:
   I think the getFirst()/getSecond() could be better named 
getAsyncRequest()/getSyncRequest() since that's what first and second actually 
mean.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -378,32 +452,38 @@ public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, Ac
                         acknowledgementsMapForNode.put(tip, acknowledgements);
 
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(),
+                                node.id());
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeCloseSuccess,
-                        this::handleShareAcknowledgeCloseFailure,
-                        resultHandler,
-                        true
-                ));
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+                // There can only be one commitSync()/close() happening at a 
time. So per node,

Review Comment:
   This is true but you could ensure that it's true in the code also.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;

Review Comment:
   This confuses me. It says "if there are no async or sync acks", but then it 
sets `areAnyAcksLeft = true`. Seems that the logic and the names of the methods 
could be a bit clearer.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());

Review Comment:
   Shouldn't this be `iterator.remove()`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -378,32 +452,38 @@ public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, Ac
                         acknowledgementsMapForNode.put(tip, acknowledgements);
 
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(),
+                                node.id());
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeCloseSuccess,
-                        this::handleShareAcknowledgeCloseFailure,
-                        resultHandler,
-                        true
-                ));
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+                // There can only be one commitSync()/close() happening at a 
time. So per node,
+                // there will be one acknowledge request state.
+                acknowledgeRequestStates.put(nodeId, new 
Pair<>(acknowledgeRequestStates.get(nodeId).getFirst(),
+                        new AcknowledgeRequestState(logContext,
+                                
ShareConsumeRequestManager.class.getSimpleName() + ":1",

Review Comment:
   The name with `:3` at the end has been lost with this PR. Please restore 
this.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -287,22 +338,29 @@ public CompletableFuture<Map<TopicIdPartition, 
Acknowledgements>> commitSync(
                         acknowledgementsMapForNode.put(tip, acknowledgements);
 
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                        log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(),
+                                node.id());
                         resultCount.incrementAndGet();
                     }
                 }
-                acknowledgeRequestStates.add(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        this::handleShareAcknowledgeSuccess,
-                        this::handleShareAcknowledgeFailure,
-                        resultHandler
-                ));
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
+                // There can only be one commitSync()/close() happening at a 
time. So per node,

Review Comment:
   This is true but it would be simple to ensure that using a bit of defensive 
code. If we made an error in the code later, we might end up overwriting 
something we didn't expect and not notice.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -324,44 +382,60 @@ public void commitAsync(final Map<TopicIdPartition, 
Acknowledgements> acknowledg
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
                 Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
+
+                acknowledgeRequestStates.putIfAbsent(nodeId, new Pair<>(null, 
null));
+
                 for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
                     Acknowledgements acknowledgements = 
acknowledgementsMap.get(tip);
                     if (acknowledgements != null) {
                         acknowledgementsMapForNode.put(tip, acknowledgements);
 
                         
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
+                        log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(),
+                                node.id());
                         resultCount.incrementAndGet();
+                        if (acknowledgeRequestStates.get(nodeId).getFirst() == 
null) {

Review Comment:
   You should assign the result of 
`acknowledgeRequestStates.get(nodeId).getFirst()` to a variable so that you can 
avoid repeatedly evaluating this expression. I think you're doing it 3 times 
when once would be enough and also make the code clearer.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+            }
+        }
+        if (!acknowledgeRequestStates.isEmpty())
+            areAnyAcksLeft = true;
+        return areAnyAcksLeft;
+    }
+
+    private boolean isRequestStateEmpty(AcknowledgeRequestState 
acknowledgeRequestState) {

Review Comment:
   This could be clearer. I think `ars == null && ars.acksToSend.isEmpty && 
ars.ia.isEmpty() && ars.ifa.isEmpty()`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+            }
+        }
+        if (!acknowledgeRequestStates.isEmpty())
+            areAnyAcksLeft = true;
+        return areAnyAcksLeft;
+    }
+
+    private boolean isRequestStateEmpty(AcknowledgeRequestState 
acknowledgeRequestState) {

Review Comment:
   You could even make a method `boolean AcknowledgeRequestState.isEmpty()` so 
that this code was not intimately aware of the three maps within.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to