chia7712 commented on code in PR #16942: URL: https://github.com/apache/kafka/pull/16942#discussion_r1723824462
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -351,7 +351,7 @@ public CompletableFuture<List<AcquiredRecords>> acquire( String memberId, FetchPartitionData fetchPartitionData ) { - log.trace("Received acquire request for share partition: {}-{}", memberId, fetchPartitionData); + log.trace("Received acquire request for share partition: {}-{}", memberId, topicIdPartition); Review Comment: The other logs use `groupId` and `topicIdPartition`, so I guess `memberId` should be replaced by `groupId`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -266,11 +266,18 @@ private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknow return Optional.empty(); } else if (!acknowledgeRequestState.maybeExpire()) { if (acknowledgeRequestState.canSendRequest(currentTimeMs)) { - acknowledgeRequestState.onSendAttempt(currentTimeMs); - if (onCommitAsync) { - isAsyncDone.set(true); + UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); Review Comment: This method has some nested if-else. Maybe we can flatten them? for example: ```java private Optional<UnsentRequest> maybeBuildRequest(AcknowledgeRequestState acknowledgeRequestState, long currentTimeMs, boolean onCommitAsync, AtomicBoolean isAsyncDone) { boolean asyncDone = true; try { if (acknowledgeRequestState == null || (!acknowledgeRequestState.onClose() && acknowledgeRequestState.isEmpty())) { return Optional.empty(); } if (acknowledgeRequestState.maybeExpire()) { // Fill in TimeoutException for (TopicIdPartition tip : acknowledgeRequestState.incompleteAcknowledgements.keySet()) { metricsManager.recordFailedAcknowledgements(acknowledgeRequestState.getIncompleteAcknowledgementsCount(tip)); acknowledgeRequestState.handleAcknowledgeTimedOut(tip); } acknowledgeRequestState.incompleteAcknowledgements.clear(); return Optional.empty(); } if (!acknowledgeRequestState.canSendRequest(currentTimeMs)) { // We wait for the backoff before we can send this request. asyncDone = false; return Optional.empty(); } UnsentRequest request = acknowledgeRequestState.buildRequest(currentTimeMs); if (request == null) { asyncDone = false; return Optional.empty(); } acknowledgeRequestState.onSendAttempt(currentTimeMs); return Optional.of(request); } finally { if (onCommitAsync) { isAsyncDone.set(asyncDone); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org