hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r799847085



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture<ByteBuffer> fut
     private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse 
joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment 
for the group
-            Map<String, ByteBuffer> groupAssignment = 
performAssignment(joinResponse.data().leader(), 
joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = performAssignment(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()

Review comment:
       I think a comment about this would be helpful. An obvious question is 
why do we still call `performAssignment` when `skipAssignment` is set. It's 
useful to remember that we still need to propagate the leader and member state 
to the coordinator implementation.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String 
memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, 
String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                        String protocol,
+                                                        
List<JoinGroupResponseMember> allMemberMetadata,
+                                                        Boolean 
skipAssignment) {
+        // Connect does not support static membership so skipping the
+        // assignment should never happen in practice.
+        if (skipAssignment)
+            return Collections.emptyMap();

Review comment:
       Would it make sense to raise an exception instead?

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && 
records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the 
committed offset of consumer1")
   }
+  @Test

Review comment:
       nit: add newline

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() {
         assertTrue(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
+        // ensure metadata is up-to-date for leader
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // the leader is responsible for picking up metadata changes and 
forcing a group rebalance.
+        // note that `partitionAssignor.prepare` is not called therefore 
calling `partitionAssignor.assign`
+        // will throw a IllegalStateException. this indirectly verifies that 
`assign` is correctly skipped.
+        Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));

Review comment:
       Could we have a case where the other consumers are subscribed to a topic 
that this consumer is not also subscribed to?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -642,6 +648,12 @@ private void maybeUpdateGroupSubscription(String 
assignorName,
         updateGroupSubscription(allSubscribedTopics);
 
         isLeader = true;
+        assignmentSnapshot = metadataSnapshot;
+
+        if (skipAssignment)
+            return Collections.emptyMap();

Review comment:
       Can we add some logging for this case?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -198,11 +198,13 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
      * @param leaderId The id of the leader (which is this member)
      * @param protocol The protocol selected by the coordinator
      * @param allMemberMetadata Metadata from all members of the group
+     * @param skipAssignment True if leader must skip running the assignor
      * @return A map from each member to their state assignment
      */
     protected abstract Map<String, ByteBuffer> performAssignment(String 
leaderId,
                                                                  String 
protocol,
-                                                                 
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
+                                                                 
List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
+                                                                 Boolean 
skipAssignment);

Review comment:
       nit: could this be `boolean`? Seems we don't need null.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -386,15 +386,39 @@ public void 
testPerformAssignmentShouldValidateCooperativeAssignment() {
             if (protocol == COOPERATIVE) {
                 // in cooperative protocol, we should throw exception when 
validating cooperative assignment
                 Exception e = assertThrows(IllegalStateException.class,
-                    () -> coordinator.performAssignment("1", 
partitionAssignor.name(), metadata));
+                    () -> coordinator.performAssignment("1", 
partitionAssignor.name(), metadata, false));
                 assertTrue(e.getMessage().contains("Assignor supporting the 
COOPERATIVE protocol violates its requirements"));
             } else {
                 // in eager protocol, we should not validate assignment
-                coordinator.performAssignment("1", partitionAssignor.name(), 
metadata);
+                coordinator.performAssignment("1", partitionAssignor.name(), 
metadata, false);
             }
         }
     }
 
+    @Test
+    public void testPerformAssignmentShouldSkipAssignment() {
+        SubscriptionState mockSubscriptionState = 
Mockito.mock(SubscriptionState.class);
+
+        Map<String, List<String>> memberSubscriptions = 
singletonMap(consumerId, singletonList(topic1));
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new 
ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : 
memberSubscriptions.entrySet()) {
+            ConsumerPartitionAssignor.Subscription subscription = new 
ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = 
ConsumerProtocol.serializeSubscription(subscription);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        // `partitionAssignor.prepare` is not called therefore calling 
`partitionAssignor.assign` will throw

Review comment:
       Where is `partitionAssignor.prepare` defined? I wonder if it would be 
more direct to install a mock assignor and then verify no interactions.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1306,23 +1326,48 @@ class GroupCoordinator(val brokerId: Int,
                 protocolType = group.protocolType,
                 protocolName = group.protocolName,
                 leaderId = currentLeader,
+                skipAssignment = false,
                 error = error
               ))
             } else {
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = List.empty,
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                // We want to avoid current leader performing trivial 
assignment while the group
-                // is in stable stage, because the new assignment in leader's 
next sync call
-                // won't be broadcast by a stable group. This could be 
guaranteed by
-                // always returning the old leader id so that the current 
leader won't assume itself
-                // as a leader based on the returned message, since the new 
member.id won't match
-                // returned leader id, therefore no assignment will be 
performed.
-                leaderId = currentLeader,
-                error = Errors.NONE))
+              if (supportSkippingAssignment) {

Review comment:
       nit: `else if`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to