dajac commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1462874355
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,38 @@ private CompletableFuture<Void> assignPartitions(
// Make assignment effective on the client by updating the
subscription state.
updateSubscription(assignedPartitions, false);
+ // Mark assigned partitions as pendingOnAssignedCallback to
temporarily stop fetching or
+ // initializing positions for them. Passing the full set of assigned
partitions
+ // (previously owned and newly added), given that they are all
provided to the user in the
+ // callback, so we could expect offsets updates for any of them.
+ Set<TopicPartition> assignedTopicPartition =
assignedPartitions.stream().map(tIdp ->
tIdp.topicPartition()).collect(Collectors.toSet());
+ subscriptions.markPendingOnAssignedCallback(assignedTopicPartition,
true);
Review Comment:
We may have a race condition here. When the partitions are assigned with
`updateSubscription` at L1135, they starts in the INIT state. If the
application thread calls `poll` before `markPendingOnAssignedCallback` is
effectively called here, it would initialize the offset reset, isn't it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]