AndrewJSchofield commented on code in PR #16727:
URL: https://github.com/apache/kafka/pull/16727#discussion_r1698096040


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -50,20 +50,21 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /**
- * {@code ShareConsumeRequestManager} is responsible for generating {@link 
ShareFetchRequest} and
- * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being 
delivered for a consumer
+ * {@code ShareConsumeRequestManager} is responsible for generating

Review Comment:
   nit: Comment formatting messed up without changing the works at all.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1022,9 +1194,12 @@ private List<UnsentRequest> 
removeUnsentRequestByNode(Node node) {
 
         @Override
         protected void checkDisconnects(final long currentTimeMs) {
-            // any disconnects affecting requests that have already been 
transmitted will be handled
-            // by NetworkClient, so we just need to check whether connections 
for any of the unsent
-            // requests have been disconnected; if they have, then we complete 
the corresponding future
+            // any disconnects affecting requests that have already been 
transmitted will be

Review Comment:
   formatting



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -259,11 +250,71 @@ private PollResult processAcknowledgements(long 
currentTimeMs) {
         return pollResult;
     }
 
+    private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState 
acknowledgeRequestState,
+            long currentTimeMs, boolean onCommitAsync) {
+        if (acknowledgeRequestState == null || (!closing && 
acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                && 
acknowledgeRequestState.incompleteAcknowledgements.isEmpty())) {
+            if (onCommitAsync) {
+                isAsyncDone = true;
+            }
+            return Optional.empty();
+        } else if (!acknowledgeRequestState.maybeExpire()) {
+            if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
+                log.debug("Skipping acknowledge request because previous 
request to {} has not been processed",
+                        acknowledgeRequestState.nodeId);
+            } else {
+                if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
+                    acknowledgeRequestState.onSendAttempt(currentTimeMs);
+                    if (onCommitAsync) {
+                        isAsyncDone = true;
+                    }
+                    return 
Optional.of(acknowledgeRequestState.buildRequest(currentTimeMs));
+                }
+            }
+        } else {
+            // Fill in TimeoutException
+            for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
+                metricsManager
+                        
.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
+                acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
+            }
+            acknowledgeRequestState.incompleteAcknowledgements.clear();
+            isAsyncDone = true;
+        }
+        return Optional.empty();
+    }
+
+    private boolean areAnyAcknowledgementsLeft() {
+        boolean areAnyAcksLeft = false;
+        Iterator<Map.Entry<Integer, Pair<AcknowledgeRequestState>>> iterator = 
acknowledgeRequestStates.entrySet()
+                .iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, Pair<AcknowledgeRequestState>> 
acknowledgeRequestStatePair = iterator.next();
+            if 
(isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getFirst())
+                    && 
isRequestStateEmpty(acknowledgeRequestStatePair.getValue().getSecond())) {
+                areAnyAcksLeft = true;
+            } else if (!closing) {
+                
acknowledgeRequestStates.remove(acknowledgeRequestStatePair.getKey());
+            }
+        }
+        if (!acknowledgeRequestStates.isEmpty())
+            areAnyAcksLeft = true;
+        return areAnyAcksLeft;
+    }
+
+    private boolean isRequestStateEmpty(AcknowledgeRequestState 
acknowledgeRequestState) {
+        return acknowledgeRequestState == null
+                || !acknowledgeRequestState.acknowledgementsToSend.isEmpty()
+                || 
!acknowledgeRequestState.incompleteAcknowledgements.isEmpty()
+                || !acknowledgeRequestState.inFlightAcknowledgements.isEmpty();
+    }
+
     /**
      * Enqueue an AcknowledgeRequestState to be picked up on the next poll
      *
      * @param acknowledgementsMap The acknowledgements to commit
-     * @param deadlineMs          Time until which the request will be retried 
if it fails with
+     * @param deadlineMs          Time until which the request will be retried 
if it

Review Comment:
   Formatting



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -449,7 +531,8 @@ private void handleShareFetchSuccess(Node fetchTarget,
                     }
                     
acks.setAcknowledgeErrorCode(Errors.forCode(partitionData.acknowledgeErrorCode()));
                     Map<TopicIdPartition, Acknowledgements> acksMap = 
Collections.singletonMap(tip, acks);
-                    ShareAcknowledgementCommitCallbackEvent event = new 
ShareAcknowledgementCommitCallbackEvent(acksMap);
+                    ShareAcknowledgementCommitCallbackEvent event = new 
ShareAcknowledgementCommitCallbackEvent(

Review Comment:
   If you want to make the line shorter, it's more readable to break it before 
"new".



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -1034,7 +1209,8 @@ protected void checkDisconnects(final long currentTimeMs) 
{
                         FutureCompletionHandler handler = 
unsentRequest.handler();
                         AuthenticationException authenticationException = 
client.authenticationException(node);
                         long startMs = unsentRequest.timer().currentTimeMs() - 
unsentRequest.timer().elapsedMs();
-                        handler.onComplete(new 
ClientResponse(makeHeader(unsentRequest.requestBuilder().latestAllowedVersion()),
+                        handler.onComplete(new ClientResponse(
+                                
makeHeader(unsentRequest.requestBuilder().latestAllowedVersion()),

Review Comment:
   nit: This would be neater with the `new ClientResponse(` on the new line in 
its entirety.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -207,46 +212,32 @@ public void fetch(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap) {
     }
 
     /**
-     * Process acknowledgeRequestStates and prepares a list of 
acknowledgements to be sent in the poll().
+     * Process acknowledgeRequestStates and prepares a list of 
acknowledgements to
+     * be sent in the poll().
      *
      * @param currentTimeMs the current time in ms.
      *
      * @return the PollResult containing zero or more acknowledgements.
      */
     private PollResult processAcknowledgements(long currentTimeMs) {
         List<UnsentRequest> unsentRequests = new ArrayList<>();
-        Iterator<AcknowledgeRequestState> iterator = 
acknowledgeRequestStates.iterator();
-        while (iterator.hasNext()) {
-            AcknowledgeRequestState acknowledgeRequestState = iterator.next();
-            if (acknowledgeRequestState.isProcessed()) {
-                iterator.remove();
-            } else if (!acknowledgeRequestState.maybeExpire()) {
-                if 
(nodesWithPendingRequests.contains(acknowledgeRequestState.nodeId)) {
-                    log.trace("Skipping acknowledge request because previous 
request to {} has not been processed", acknowledgeRequestState.nodeId);
-                } else {
-                    if (acknowledgeRequestState.canSendRequest(currentTimeMs)) 
{
-                        acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                        UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
-                        if (request != null) {
-                            unsentRequests.add(request);
-                        }
-                    }
-                }
-            } else {
-                // Fill in TimeoutException
-                for (TopicIdPartition tip : 
acknowledgeRequestState.acknowledgementsMap.keySet()) {
-                    
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getAcknowledgementsCount(tip));
-                    acknowledgeRequestState.handleAcknowledgeErrorCode(tip, 
Errors.REQUEST_TIMED_OUT);
-                }
-                iterator.remove();
-            }
+        for (Pair<AcknowledgeRequestState> requestStates : 
acknowledgeRequestStates.values()) {
+            isAsyncDone = false;
+            // For commitAsync
+            maybeBuildRequest(requestStates.getFirst(), currentTimeMs, 
true).ifPresent(unsentRequests::add);
+
+            // Check to ensure we start processing commitSync/close only if 
there are no
+            // commitAsync requests left to process.
+            if (isAsyncDone)
+                maybeBuildRequest(requestStates.getSecond(), currentTimeMs, 
false).ifPresent(unsentRequests::add);
         }
 
         PollResult pollResult = null;
         if (!unsentRequests.isEmpty()) {
             pollResult = new PollResult(unsentRequests);
-        } else if (!acknowledgeRequestStates.isEmpty()) {
-            // Return empty result until all the acknowledgement request 
states are processed
+        } else if (areAnyAcknowledgementsLeft()) {
+            // Return empty result until all the acknowledgement request 
states are

Review Comment:
   formatting



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -50,20 +50,21 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /**
- * {@code ShareConsumeRequestManager} is responsible for generating {@link 
ShareFetchRequest} and
- * {@link ShareAcknowledgeRequest} to fetch and acknowledge records being 
delivered for a consumer
+ * {@code ShareConsumeRequestManager} is responsible for generating

Review Comment:
   nit: Comment formatting messed up without changing the words at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to