AndrewJSchofield commented on code in PR #17909:
URL: https://github.com/apache/kafka/pull/17909#discussion_r1854333523


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1165,7 +1165,10 @@ public void complete(TopicIdPartition partition, 
Acknowledgements acknowledgemen
             }
             // For commitAsync, we do not wait for other results to complete, 
we prepare a background event
             // for every ShareAcknowledgeResponse.
-            if (isCommitAsync || (remainingResults  != null && 
remainingResults.decrementAndGet() == 0)) {
+            // For commitAsync, we send out a background event for every 
TopicIdPartition, so we use a singletonMap each time.
+            if (isCommitAsync) {
+                
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition,
 acknowledgements));
+            } else if (remainingResults  != null && 
remainingResults.decrementAndGet() == 0) {

Review Comment:
   nit: Extra space



##########
core/src/test/java/kafka/test/api/ShareConsumerTest.java:
##########
@@ -421,6 +421,10 @@ public void onComplete(Map<TopicIdPartition, Set<Long>> 
offsetsMap, Exception ex
                     Set<Long> mergedOffsets = new HashSet<>();
                     mergedOffsets.addAll(oldOffsets);
                     mergedOffsets.addAll(newOffsets);
+                    if (mergedOffsets.size() < (oldOffsets.size() + 
newOffsets.size())) {

Review Comment:
   I think this is quite a dangerous thing to leave in here. In principle, a 
test could rely upon repeated delivery of a record (timeout for example) which 
could actually call the callback twice. I'd remove this because I predict it 
will be the source of a flaky test later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to