lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383433245
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,33 +243,461 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoinGroup(); + }); } /** * {@inheritDoc} */ @Override public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = -1; + invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + @Override + public void transitionToJoinGroup() { + resetEpoch(); + transitionTo(MemberState.JOINING); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture<Void> leaveGroup() { + transitionTo(MemberState.LEAVING_GROUP); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + + }); + + // Return callback future to indicate that the leave group is done when the callbacks + // complete, without waiting for the heartbeat to be sent out. (Best effort to send it + // but do not hold the leave group operation for it) + return callbackResult; } + /** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. + */ + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); + } else { + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + } + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = leaveGroupEpoch(); + currentAssignment = new HashSet<>(); + targetAssignment = Optional.empty(); + transitionTo(MemberState.SENDING_LEAVE_REQUEST); + } + + /** + * Return the epoch to use in the Heartbeat request to indicate that the member wants to + * leave the group. Should be -2 if this is a static member, or -1 in any other case. + */ + private int leaveGroupEpoch() { + return groupInstanceId.isPresent() ? -2 : -1; + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public boolean shouldHeartbeatNow() { + return state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT || + state() == MemberState.SENDING_LEAVE_REQUEST; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { + @Override + public void onHeartbeatRequestSent() { + if (state() == MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT) { transitionTo(MemberState.STABLE); + } else if (state() == MemberState.SENDING_LEAVE_REQUEST) { + transitionTo(MemberState.NOT_IN_GROUP); + } + } + + @Override + public boolean shouldSkipHeartbeat() { + return state() == MemberState.NOT_IN_GROUP || state() == MemberState.FATAL; + } + + void reconcile(ConsumerGroupHeartbeatResponseData.Assignment targetAssignment) { + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<SortedSet<TopicPartition>> assignedPartitionsByNameResult = extractTopicPartitionsFromAssignment(targetAssignment); + + assignedPartitionsByNameResult.whenComplete((assignedPartitions, metadataError) -> { + if (metadataError != null) { + log.error("Reconciliation failed due to error getting metadata to resolve topic " + + "names for topic IDs {} in target assignment.", targetAssignment); + // TODO: failing reconciliation (no ack sent to the broker), but leaving + // member in STABLE state. Double check if any other action should be taken + // here. + transitionTo(MemberState.STABLE); + return; + } + + if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { + log.debug("Target assignment {} does not match the current subscription {}; it is " + + "likely that the subscription has changed since we joined the group, " + + "will re-join with current subscription", + targetAssignment, + subscriptions.prettyString()); + transitionToJoinGroup(); + return; + } + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR); + addedPartitions.addAll(assignedPartitions); + addedPartitions.removeAll(ownedPartitions); + + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedPartitions); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(r -> { + if (state == MemberState.RECONCILING) { + // Make assignment effective on the client by updating the subscription state. + subscriptions.assignFromSubscribed(assignedPartitions); + // Invoke user call back + return invokeOnPartitionsAssignedCallback(addedPartitions); + } else { + // Revocation callback completed but member already moved out of the + // reconciling state. + CompletableFuture<Void> res = new CompletableFuture<>(); + res.completeExceptionally(new KafkaException("Interrupting " + + "reconciliation after revocation, as the member already " + + "transitioned out of the reconciling state into " + state)); + return res; + } + }); + + reconciliationResult.whenComplete((result, error) -> { + if (error != null) { + log.error("Reconciliation failed. ", error); + } else { + if (state == MemberState.RECONCILING) { + + // Make assignment effective on the broker by transitioning to send acknowledge. + transitionTo(MemberState.ACKNOWLEDGING_RECONCILED_ASSIGNMENT); + + // Make assignment effective on the member group manager + this.currentAssignment = assignedPartitions; + this.targetAssignment = Optional.empty(); + + } else { + log.debug("New assignment processing completed but the member already " + + "transitioned out of the reconciliation state into {}. Interrupting " + + "reconciliation as it's not relevant anymore,", state); + // TODO: double check if subscription state changes needed. This is expected to be + // the case where the member got fenced, failed or unsubscribed while the + // reconciliation was in process. Transitions to those states update the + // subscription state accordingly so it shouldn't be necessary to make any changes + // to the subscription state at this point. + } + } + }); + }); + } + + /** + * Build set of TopicPartition (topic name and partition id) from the assignment received + * from the broker (topic IDs and list of partitions). For each topic ID this will attempt to + * find the topic name in the metadata. If a topic ID is not found, this will request a + * metadata update, and the reconciliation will resume the topic metadata is received. + * + * @param assignment Assignment received from the broker, containing partitions grouped by + * topic id. + * @return Set of {@link TopicPartition} containing topic name and partition id. + */ + private CompletableFuture<SortedSet<TopicPartition>> extractTopicPartitionsFromAssignment( + ConsumerGroupHeartbeatResponseData.Assignment assignment) { + SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR); + + List<Uuid> topicsRequiringMetadata = new ArrayList<>(); + assignment.topicPartitions().forEach(topicPartitions -> { + Uuid topicId = topicPartitions.topicId(); + if (!metadata.topicNames().containsKey(topicId)) { + topicsRequiringMetadata.add(topicId); + } else { + String topicName = metadata.topicNames().get(topicId); + topicPartitions.partitions().forEach(tp -> assignedPartitions.add(new TopicPartition(topicName, tp))); + } + + }); + + if (topicsRequiringMetadata.isEmpty()) { + return CompletableFuture.completedFuture(assignedPartitions); } else { - transitionTo(MemberState.RECONCILING); + return resolveTopicNamesForTopicIds(topicsRequiringMetadata, assignedPartitions); + } + } + + /** + * Perform a topic metadata request to discover topic names for the given topic ids. + * + * @param topicsRequiringMetadata List of topic Uuid for which topic names are needed. + * @param resolvedTopicPartitions List of TopicPartitions for the topics with known names. + * This list will be extended when the missing topic names are + * received in metadata. + * + * @return Future that will complete when topic names are received for all + * topicsRequiringMetadata. It will fail if a metadata response is received but does not + * include all the topics that were requested. + */ + private CompletableFuture<SortedSet<TopicPartition>> resolveTopicNamesForTopicIds( + List<Uuid> topicsRequiringMetadata, + SortedSet<TopicPartition> resolvedTopicPartitions) { + CompletableFuture<SortedSet<TopicPartition>> result = new CompletableFuture<>(); + log.debug("Topic IDs {} received in assignment were not found in metadata. " + + "Requesting metadata to resolve topic names and proceed with the " + + "reconciliation.", topicsRequiringMetadata); + // TODO: request metadata only for the topics that require it. Passing empty list to + // retrieve it for all topics until the TopicMetadataRequestManager supports a list + // of topics. + CompletableFuture<Map<Topic, List<PartitionInfo>>> metadataResult = metadataRequestManager.requestTopicMetadata(Optional.empty()); + metadataResult.whenComplete((topicNameAndPartitionInfo, error) -> { + if (error != null) { + // Metadata request to get topic names failed. The TopicMetadataManager + // handles retries on retriable errors, so at this point we consider this a + // fatal error. + log.error("Metadata request for topic IDs {} received in assignment failed.", + topicsRequiringMetadata, error); + result.completeExceptionally(new KafkaException("Failed to get metadata for " + + "topic IDs received in target assignment.", error)); + } else { + topicNameAndPartitionInfo.forEach((topic, partitionInfoList) -> { + if (topicsRequiringMetadata.contains(topic.topicId())) { + partitionInfoList.forEach(partitionInfo -> + resolvedTopicPartitions.add(new TopicPartition(topic.topicName(), partitionInfo.partition()))); + topicsRequiringMetadata.remove(topic.topicId()); + } + }); + if (topicsRequiringMetadata.isEmpty()) { + result.complete(resolvedTopicPartitions); + } else { + // TODO: check if this could happen. If so, we probably need to retry the Review Comment: Thanks for confirming @AndrewJSchofield . After the change to integrate this with the centralized metadata object and cache, we do achieve this behaviour (we keep retrying until all assigned topic IDs are found in metadata) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org