lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1394753399


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -77,32 +138,111 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Assignment that the member received from the server and successfully 
processed.
      */
-    private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment;
+    private Set<TopicPartition> currentAssignment;
 
     /**
-     * Assignment that the member received from the server but hasn't 
completely processed
-     * yet.
+     * Subscription state object holding the current assignment the member has 
for the topics it
+     * subscribed to.
      */
-    private Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
targetAssignment;
+    private final SubscriptionState subscriptions;
+
+    /**
+     * Metadata that allows us to create the partitions needed for {@link 
ConsumerRebalanceListener}.
+     */
+    private final ConsumerMetadata metadata;
+
+    /**
+     * TopicPartition comparator based on topic name and partition id.
+     */
+    private final static TopicPartitionComparator COMPARATOR = new 
TopicPartitionComparator();
 
     /**
      * Logger.
      */
     private final Logger log;
 
-    public MembershipManagerImpl(String groupId, LogContext logContext) {
-        this(groupId, null, null, logContext);
+    /**
+     * Manager to perform commit requests needed before revoking partitions 
(if auto-commit is
+     * enabled)
+     */
+    private final CommitRequestManager commitRequestManager;
+
+    /**
+     * Local cache of assigned topic IDs and names. Topics are added here when 
received in a
+     * target assignment, as we discover their names in the Metadata cache, 
and removed when the
+     * topic is not in the subscription anymore. The purpose of this cache is 
to avoid metadata
+     * requests in cases where a currently assigned topic is in the target 
assignment (new
+     * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
+     * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
+     * member fails ({@link #transitionToFatal()} or leaves the group ({@link 
#leaveGroup()}).
+     */
+    private final Map<Uuid, String> assignedTopicNamesCache;
+
+    /**
+     * Topic IDs received in a target assignment for which we haven't found 
topic names yet.
+     * Items are added to this set every time a target assignment is received. 
Items are removed
+     * when metadata is found for the topic. This is where the member collects 
all assignments
+     * received from the broker, even though they may not be ready to 
reconcile due to missing
+     * metadata.
+     */
+    private final Map<Uuid, List<Integer>> assignmentUnresolved;
+
+    /**
+     * Assignment received for which topic names have been resolved, so it's 
ready to be
+     * reconciled. Items are added to this set when received in a target 
assignment (if metadata
+     * available), or when a metadata update is received. This is where the 
member keeps all the
+     * assignment ready to reconcile, even though the reconciliation might 
need to wait if there
+     * is already another on in process.
+     */
+    private final SortedSet<TopicPartition> assignmentReadyToReconcile;
+
+    /**
+     * Epoch that a member must include a heartbeat request to indicate that 
it want to join or
+     * re-join a group.
+     */
+    public static final int JOIN_GROUP_EPOCH = 0;
+
+    /**
+     * If there is a reconciliation running (triggering commit, callbacks) for 
the
+     * assignmentReadyToReconcile. This will be true if {@link #reconcile()} 
has been triggered
+     * after receiving a heartbeat response, or a metadata update.
+     */
+    private boolean reconciliationInProgress;
+
+    /**
+     * ID the member had when the reconciliation in progress started. This is 
used to identify if
+     * the member has rejoined while it was reconciling an assignment (in 
which case the result
+     * of the reconciliation is not applied.)
+     */
+    private String memberIdOnReconciliationStart;

Review Comment:
   Yes, I just updated this. We're on the same page regarding that the client 
will keep the member ID forever and provide it back....but I was wrongly 
expecting it would change after rejoining. Updated now. The goal is to be able 
to identify a rejoin, so using the member epoch (expecting that every time a 
member rejoins will get a bumped epoch).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to