AndrewJSchofield commented on code in PR #17909: URL: https://github.com/apache/kafka/pull/17909#discussion_r1854333523
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -1165,7 +1165,10 @@ public void complete(TopicIdPartition partition, Acknowledgements acknowledgemen } // For commitAsync, we do not wait for other results to complete, we prepare a background event // for every ShareAcknowledgeResponse. - if (isCommitAsync || (remainingResults != null && remainingResults.decrementAndGet() == 0)) { + // For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time. + if (isCommitAsync) { + maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements)); + } else if (remainingResults != null && remainingResults.decrementAndGet() == 0) { Review Comment: nit: Extra space ########## core/src/test/java/kafka/test/api/ShareConsumerTest.java: ########## @@ -421,6 +421,10 @@ public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception ex Set<Long> mergedOffsets = new HashSet<>(); mergedOffsets.addAll(oldOffsets); mergedOffsets.addAll(newOffsets); + if (mergedOffsets.size() < (oldOffsets.size() + newOffsets.size())) { Review Comment: I think this is quite a dangerous thing to leave in here. In principle, a test could rely upon repeated delivery of a record (timeout for example) which could actually call the callback twice. I'd remove this because I predict it will be the source of a flaky test later on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org