lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394753399
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -77,32 +138,111 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ - private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + private Set<TopicPartition> currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ - private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment; + private final SubscriptionState subscriptions; + + /** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ + private final ConsumerMetadata metadata; + + /** + * TopicPartition comparator based on topic name and partition id. + */ + private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; - public MembershipManagerImpl(String groupId, LogContext logContext) { - this(groupId, null, null, logContext); + /** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ + private final CommitRequestManager commitRequestManager; + + /** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ + private final Map<Uuid, String> assignedTopicNamesCache; + + /** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ + private final Map<Uuid, List<Integer>> assignmentUnresolved; + + /** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ + private final SortedSet<TopicPartition> assignmentReadyToReconcile; + + /** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ + public static final int JOIN_GROUP_EPOCH = 0; + + /** + * If there is a reconciliation running (triggering commit, callbacks) for the + * assignmentReadyToReconcile. This will be true if {@link #reconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ + private boolean reconciliationInProgress; + + /** + * ID the member had when the reconciliation in progress started. This is used to identify if + * the member has rejoined while it was reconciling an assignment (in which case the result + * of the reconciliation is not applied.) + */ + private String memberIdOnReconciliationStart; Review Comment: Yes, I just updated this. We're on the same page regarding that the client will keep the member ID forever and provide it back....but I was wrongly expecting it would change after rejoining. Updated now. The goal is to be able to identify a rejoin, so using the member epoch (expecting that every time a member rejoins will get a bumped epoch). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org