lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1462378625
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1134,9 +1134,38 @@ private CompletableFuture<Void> assignPartitions(
// Make assignment effective on the client by updating the
subscription state.
updateSubscription(assignedPartitions, false);
+ // Mark assigned partitions as pendingOnAssignedCallback to
temporarily stop fetching or
+ // initializing positions for them. Passing the full set of assigned
partitions
+ // (previously owned and newly added), given that they are all
provided to the user in the
+ // callback, so we could expect offsets updates for any of them.
+ Set<TopicPartition> assignedTopicPartition =
assignedPartitions.stream().map(tIdp ->
tIdp.topicPartition()).collect(Collectors.toSet());
+ subscriptions.markPendingOnAssignedCallback(assignedTopicPartition,
true);
+
// Invoke user call back.
CompletableFuture<Void> result =
invokeOnPartitionsAssignedCallback(addedPartitions);
+ // Resume partitions only if the callback succeeded.
+ result.whenComplete((error, callbackResult) -> {
+ if (error == null) {
+ // Remove pendingOnAssignedCallback flag from the assigned
partitions, so we can
+ // start fetching, and updating positions for them if needed.
+
subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false);
+ } else {
+ // Remove pendingOnAssignedCallback flag from the previously
owned only so that
Review Comment:
@dajac I've been debating about the right behaviour on this failure
scenario. Thinking about what is expected to happen after the callback fails,
it seemed sensible to ensure that the partitions that were previously owned are
back to normal (fetchable and initializing positions if needed, in case the
user just ignores the exception and continues polling), and the partitions that
were trying to be added are not (since the consumer won't ack them to the
broker, and will remove them from the subscription, or try again based on the
next assignment received. Either way we don't want any progress on them).
Thoughts? Am I missing something maybe?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]