lianetm commented on code in PR #15511:
URL: https://github.com/apache/kafka/pull/15511#discussion_r1523858796


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -889,43 +914,36 @@ private void transitionToStale() {
      */
     void maybeReconcile() {
         if (targetAssignmentReconciled()) {
-            log.debug("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
+            log.trace("Ignoring reconciliation attempt. Target assignment is 
equal to the " +
                     "current assignment.");
             return;
         }
         if (reconciliationInProgress) {
-            log.debug("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
+            log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
                 currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
             return;
         }
 
         // Find the subset of the target assignment that can be resolved to 
topic names, and trigger a metadata update
         // if some topic IDs are not resolvable.
         SortedSet<TopicIdPartition> assignedTopicIdPartitions = 
findResolvableAssignmentAndTriggerMetadataUpdate();
+        final LocalAssignment resolvedAssignment = new 
LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
 
-        SortedSet<TopicPartition> ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
-        ownedPartitions.addAll(subscriptions.assignedPartitions());
-
-        // Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
-        // being reconciled. Needed for interactions with the centralized 
subscription state that
-        // does not support topic IDs yet, and for the callbacks.
-        SortedSet<TopicPartition> assignedTopicPartitions = 
toTopicPartitionSet(assignedTopicIdPartitions);
-
-        // Check same assignment. Based on topic names for now, until topic 
IDs are properly
-        // supported in the centralized subscription state object. Note that 
this check is
-        // required to make sure that reconciliation is not triggered if the 
assignment ready to
-        // be reconciled is the same as the current one (even though the 
member may remain
-        // in RECONCILING state if it has some unresolved assignments).
-        boolean sameAssignmentReceived = 
assignedTopicPartitions.equals(ownedPartitions);
-
-        if (sameAssignmentReceived) {
+        if (resolvedAssignment.equals(currentAssignment)) {

Review Comment:
   well I was expecting we only need to trigger the callbacks if the assignment 
changed (could be to empty, but something needs to change), and that's not the 
case if the member ends up with t1-1 again, that it already owns. 
   
   By running a full reconciliation when the the resolved assignment is the 
same as the current but received later, we end up with a client reconciling the 
exact same assignment it already owns :S It would turn out noisy I expect, 
accounting for 2 rebalances in cases probably much more common, where a new 
topic assigned is temporarily not in metadata and then discovered: member owns 
[t1-1], receives assignment [t1-1, t2-1] with missing metadata for t2 (t2 
discovered shortly after). We would generate 2 rebalances (a 1st one with no 
changes in assignment, a 2nd one with the added topic once discovered) when 
truly things only changed once). Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to