chia7712 commented on code in PR #16942:
URL: https://github.com/apache/kafka/pull/16942#discussion_r1723824462


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -351,7 +351,7 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
         String memberId,
         FetchPartitionData fetchPartitionData
     ) {
-        log.trace("Received acquire request for share partition: {}-{}", 
memberId, fetchPartitionData);
+        log.trace("Received acquire request for share partition: {}-{}", 
memberId, topicIdPartition);

Review Comment:
   The other logs use `groupId` and `topicIdPartition`, so I guess `memberId` 
should be replaced by `groupId`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -266,11 +266,18 @@ private Optional<UnsentRequest> 
maybeBuildRequest(AcknowledgeRequestState acknow
             return Optional.empty();
         } else if (!acknowledgeRequestState.maybeExpire()) {
             if (acknowledgeRequestState.canSendRequest(currentTimeMs)) {
-                acknowledgeRequestState.onSendAttempt(currentTimeMs);
-                if (onCommitAsync) {
-                    isAsyncDone.set(true);
+                UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);

Review Comment:
   This method has some nested if-else. Maybe we can flatten them? for example:
   ```java
       private Optional<UnsentRequest> 
maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState,
                                                         long currentTimeMs,
                                                         boolean onCommitAsync,
                                                         AtomicBoolean 
isAsyncDone) {
           boolean asyncDone = true;
           try {
               if (acknowledgeRequestState == null || 
(!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) {
                   return Optional.empty();
               }
   
               if (acknowledgeRequestState.maybeExpire()) {
                   // Fill in TimeoutException
                   for (TopicIdPartition tip : 
acknowledgeRequestState.incompleteAcknowledgements.keySet()) {
                       
metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip));
                       acknowledgeRequestState.handleAcknowledgeTimedOut(tip);
                   }
                   acknowledgeRequestState.incompleteAcknowledgements.clear();
                   return Optional.empty();
               }
   
               if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) {
                   // We wait for the backoff before we can send this request.
                   asyncDone = false;
                   return Optional.empty();
               }
   
               UnsentRequest request = 
acknowledgeRequestState.buildRequest(currentTimeMs);
               if (request == null) {
                   asyncDone = false;
                   return Optional.empty();
               }
   
               acknowledgeRequestState.onSendAttempt(currentTimeMs);
               return Optional.of(request);
           } finally {
               if (onCommitAsync) {
                   isAsyncDone.set(asyncDone);
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to