hachikuji commented on a change in pull request #11688: URL: https://github.com/apache/kafka/pull/11688#discussion_r799847085
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group - Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(), - joinResponse.data().members()); + Map<String, ByteBuffer> groupAssignment = performAssignment( + joinResponse.data().leader(), + joinResponse.data().protocolName(), + joinResponse.data().members(), + joinResponse.data().skipAssignment() Review comment: I think a comment about this would be helpful. An obvious question is why do we still call `performAssignment` when `skipAssignment` is set. It's useful to remember that we still need to propagate the leader and member state to the coordinator implementation. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ########## @@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String memberId, String protocol, } @Override - protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) { + protected Map<String, ByteBuffer> performAssignment(String leaderId, + String protocol, + List<JoinGroupResponseMember> allMemberMetadata, + Boolean skipAssignment) { + // Connect does not support static membership so skipping the + // assignment should never happen in practice. + if (skipAssignment) + return Collections.emptyMap(); Review comment: Would it make sense to raise an exception instead? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ########## @@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1, "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1") } + @Test Review comment: nit: add newline ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ########## @@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() { assertTrue(coordinator.rejoinNeededOrPending()); } + @Test + public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() { + // ensure metadata is up-to-date for leader + subscriptions.subscribe(singleton(topic1), rebalanceListener); + client.updateMetadata(metadataResponse); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // the leader is responsible for picking up metadata changes and forcing a group rebalance. + // note that `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign` + // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped. + Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); Review comment: Could we have a case where the other consumers are subscribed to a topic that this consumer is not also subscribed to? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ########## @@ -642,6 +648,12 @@ private void maybeUpdateGroupSubscription(String assignorName, updateGroupSubscription(allSubscribedTopics); isLeader = true; + assignmentSnapshot = metadataSnapshot; + + if (skipAssignment) + return Collections.emptyMap(); Review comment: Can we add some logging for this case? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -198,11 +198,13 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * @param leaderId The id of the leader (which is this member) * @param protocol The protocol selected by the coordinator * @param allMemberMetadata Metadata from all members of the group + * @param skipAssignment True if leader must skip running the assignor * @return A map from each member to their state assignment */ protected abstract Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, - List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata); + List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, + Boolean skipAssignment); Review comment: nit: could this be `boolean`? Seems we don't need null. ########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ########## @@ -386,15 +386,39 @@ public void testPerformAssignmentShouldValidateCooperativeAssignment() { if (protocol == COOPERATIVE) { // in cooperative protocol, we should throw exception when validating cooperative assignment Exception e = assertThrows(IllegalStateException.class, - () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata)); + () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata, false)); assertTrue(e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements")); } else { // in eager protocol, we should not validate assignment - coordinator.performAssignment("1", partitionAssignor.name(), metadata); + coordinator.performAssignment("1", partitionAssignor.name(), metadata, false); } } } + @Test + public void testPerformAssignmentShouldSkipAssignment() { + SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class); + + Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); + + List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>(); + for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) { + ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue()); + ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription); + metadata.add(new JoinGroupResponseData.JoinGroupResponseMember() + .setMemberId(subscriptionEntry.getKey()) + .setMetadata(buf.array())); + } + + // `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign` will throw Review comment: Where is `partitionAssignor.prepare` defined? I wonder if it would be more direct to install a mock assignor and then verify no interactions. ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -1306,23 +1326,48 @@ class GroupCoordinator(val brokerId: Int, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, + skipAssignment = false, error = error )) } else { - group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) + if (supportSkippingAssignment) { Review comment: nit: `else if`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org