abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r428745517
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } + private List<MemberIdentity> getMembersFromGroup(String groupId) { + Collection<MemberDescription> members; + try { + members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); + } catch (Throwable ex) { Review comment: I think we should catch `Exception` here: https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context = - new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + new ConsumerGroupOperationContext<>(groupId, options, deadline, future); Review comment: Let's get back the original indentation. ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String groupId, final List<MemberDescription> members = new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { - throw new IllegalStateException("Consumer group '" + groupId + "' is still active " - + "and has following members: " + members + ". " - + "Make sure to stop all running application instances before running the reset tool."); + if (options.has(forceOption)) { + System.out.println("Force deleting all active members in the group: " + groupId); + adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all(); Review comment: Should we check the member removal result here before proceeding? If that call failed, the whole operation should fail with error message containing the result IMHO. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -507,7 +544,7 @@ private Topology setupTopologyWithoutIntermediateUserTopic() { return builder.build(); } - private void cleanGlobal(final boolean withIntermediateTopics, + private int tryCleanGlobal(final boolean withIntermediateTopics, Review comment: We could add meta comment for the return value here, and instead of returning an exit code, I feel a boolean is suffice to indicate whether the clean operation was successful or not. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws Exception { assertNull(noErrorResult.all().get()); assertNull(noErrorResult.memberResult(memberOne).get()); assertNull(noErrorResult.memberResult(memberTwo).get()); + + // Return with success for "removeAll" scenario Review comment: This test looks good, but it seems that we didn't test the case where some members get deleted successfully while some are not? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context = - new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + List<MemberIdentity> members; + if (options.removeAll()) { + members = getMembersFromGroup(groupId); + } else { + members = options.members().stream().map( + MemberToRemove::toMemberIdentity).collect(Collectors.toList()); + } Call findCoordinatorCall = getFindCoordinatorCall(context, - () -> getRemoveMembersFromGroupCall(context)); + () -> getRemoveMembersFromGroupCall(context, members)); runnable.call(findCoordinatorCall, startFindCoordinatorMs); return new RemoveMembersFromConsumerGroupResult(future, options.members()); } - private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) { + private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, + RemoveMembersFromConsumerGroupOptions> context, List<MemberIdentity> allMembers) { return new Call("leaveGroup", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) { @Override LeaveGroupRequest.Builder createRequest(int timeoutMs) { - return new LeaveGroupRequest.Builder(context.groupId(), - context.options().members().stream().map( - MemberToRemove::toMemberIdentity).collect(Collectors.toList())); + return new LeaveGroupRequest.Builder(context.groupId(), Review comment: nit: we could merge L3666-3667 ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } + private List<MemberIdentity> getMembersFromGroup(String groupId) { + Collection<MemberDescription> members; + try { + members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); + } catch (Throwable ex) { + throw new KafkaException("Encounter exception when trying to get members from group: " + groupId, ex); + } + + List<MemberIdentity> memberToRemove = new ArrayList<>(); + for (final MemberDescription member : members) { + if (member.groupInstanceId().isPresent()) { + memberToRemove.add(new MemberIdentity().setGroupInstanceId(member.groupInstanceId().get()) + ); Review comment: This indentation is a bit weird, let's just merge L3625-3626 ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ########## @@ -37,7 +37,15 @@ public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) this.members = new HashSet<>(members); } + public RemoveMembersFromConsumerGroupOptions() { + this.members = new HashSet<>(); Review comment: Collections.emptySet() makes more sense since it is immutable. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3660,7 +3686,7 @@ void handleResponse(AbstractResponse abstractResponse) { // We set member.id to empty here explicitly, so that the lookup will succeed as user doesn't // know the exact member.id. memberErrors.put(new MemberIdentity() - .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) + .setMemberId(memberResponse.memberId()) Review comment: I could see this doesn't hold true for a plain static member removal. Let's discuss why skipping the individual member check in `RemoveMembersFromConsumerGroupResult` makes sense over there. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl<Map<MemberIdentity, Errors>> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context = - new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + new ConsumerGroupOperationContext<>(groupId, options, deadline, future); + List<MemberIdentity> members; + if (options.removeAll()) { + members = getMembersFromGroup(groupId); + } else { + members = options.members().stream().map( + MemberToRemove::toMemberIdentity).collect(Collectors.toList()); + } Call findCoordinatorCall = getFindCoordinatorCall(context, - () -> getRemoveMembersFromGroupCall(context)); + () -> getRemoveMembersFromGroupCall(context, members)); runnable.call(findCoordinatorCall, startFindCoordinatorMs); return new RemoveMembersFromConsumerGroupResult(future, options.members()); } - private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, RemoveMembersFromConsumerGroupOptions> context) { + private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext<Map<MemberIdentity, Errors>, + RemoveMembersFromConsumerGroupOptions> context, List<MemberIdentity> allMembers) { Review comment: nit: we could name it `members` now ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ########## @@ -46,26 +46,42 @@ * If not, the first member error shall be returned. */ public KafkaFuture<Void> all() { - final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); - this.future.whenComplete((memberErrors, throwable) -> { - if (throwable != null) { - result.completeExceptionally(throwable); - } else { - for (MemberToRemove memberToRemove : memberInfos) { - if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { - return; + if (removeAll()) { Review comment: In `removeAll()` mode, why could we skip the individual member removal results? I guess although we don't need to verify against the original member list (because they don't exist for `removeAll`), going throw the sub error list is still valuable to make sure there is no unexpected failure. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ########## @@ -46,26 +46,42 @@ * If not, the first member error shall be returned. */ public KafkaFuture<Void> all() { - final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); - this.future.whenComplete((memberErrors, throwable) -> { - if (throwable != null) { - result.completeExceptionally(throwable); - } else { - for (MemberToRemove memberToRemove : memberInfos) { - if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { - return; + if (removeAll()) { + final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); + this.future.whenComplete((memberErrors, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + System.out.println("Remove all active members succeeded, removed " + memberErrors.size() + " members: " + memberErrors.keySet()); Review comment: Remove print statement. ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1147,6 +1174,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId) assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(consumerSet.size -1, testGroupDescription.members().size()) Review comment: size - 1 ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1017,47 +1017,70 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(0 == list1.errors().get().size()) assertTrue(0 == list1.valid().get().size()) val testTopicName = "test_topic" + val testTopicName1 = testTopicName + "1" + val testTopicName2 = testTopicName + "2" val testNumPartitions = 2 - client.createTopics(Collections.singleton( - new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get() - waitForTopics(client, List(testTopicName), List()) + + client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, testNumPartitions, 1.toShort), + new NewTopic(testTopicName1, testNumPartitions, 1.toShort), + new NewTopic(testTopicName2, testNumPartitions, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) val producer = createProducer() try { producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() } finally { Utils.closeQuietly(producer, "producer") } + + val EMPTY_GROUP_INSTANCE_ID = "" val testGroupId = "test_group_id" val testClientId = "test_client_id" val testInstanceId = "test_instance_id" + val testInstanceId1 = testInstanceId + "1" Review comment: I prefer `testInstanceIdOne = "test_instance_id_1"` and `testInstanceIdTwo = "test_instance_id_2"` ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1017,47 +1017,71 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(0 == list1.errors().get().size()) assertTrue(0 == list1.valid().get().size()) val testTopicName = "test_topic" + val testTopicName1 = testTopicName + "1" + val testTopicName2 = testTopicName + "2" val testNumPartitions = 2 - client.createTopics(Collections.singleton( - new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get() - waitForTopics(client, List(testTopicName), List()) + + client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, testNumPartitions, 1.toShort), + new NewTopic(testTopicName1, testNumPartitions, 1.toShort), + new NewTopic(testTopicName2, testNumPartitions, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) val producer = createProducer() try { producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() } finally { Utils.closeQuietly(producer, "producer") } + + val EMPTY_GROUP_INSTANCE_ID = "" Review comment: Fair enough ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -547,6 +584,13 @@ private void cleanGlobal(final boolean withIntermediateTopics, cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + return exitCode; Review comment: Like said earlier, I think we could just return `return new StreamsResetter().run(parameters, cleanUpConfig) == 0` ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1138,7 +1165,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId)) assertNull(validMemberFuture.get()) - // The group should contain no member now. + // The group's active members number should decrease by 1 Review comment: We could remove this comment for now ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1155,12 +1192,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) assertNull(deleteResult.deletedGroups().get(testGroupId).get()) - } finally { - consumerThread.interrupt() - consumerThread.join() - } } finally { - Utils.closeQuietly(consumer, "consumer") + consumerThreads.foreach { + case consumerThread => + consumerThread.interrupt() + consumerThread.join() + } + } + }finally { Review comment: nit: format I'm pretty surprised this wasn't caught in my previous template. Let me check how to cover this in style test as well. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -261,6 +261,43 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } + public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { + appID = testId + "-with-force-option"; + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); Review comment: What does `"" + ` mean? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ########## @@ -1075,13 +1098,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup) - assertEquals(1, testGroupDescription.members().size()) + assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) val member = testGroupDescription.members().iterator().next() assertEquals(testClientId, member.clientId()) - val topicPartitions = member.assignment().topicPartitions() - assertEquals(testNumPartitions, topicPartitions.size()) - assertEquals(testNumPartitions, topicPartitions.asScala. - count(tp => tp.topic().equals(testTopicName))) + val members = testGroupDescription.members() + assertEquals(testClientId, members.asScala.head.clientId()) Review comment: Does this check duplicate L1103? Also I think it makes sense to check all the members' clientId as they should all equal to `testClientId` ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -507,7 +544,7 @@ private Topology setupTopologyWithoutIntermediateUserTopic() { return builder.build(); } - private void cleanGlobal(final boolean withIntermediateTopics, + private int tryCleanGlobal(final boolean withIntermediateTopics, final String resetScenario, Review comment: nit: parameters are not aligned. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org