abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429333355
########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -379,6 +379,22 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } + private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId, List<String> groupInstances, + List<TopicPartition> topicPartitions) { + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); + byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; + List<DescribedGroupMember> describedGroupMembers = groupInstances.stream().map(groupInstance -> DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", "clientHost", memberAssignmentBytes, null)).collect(Collectors.toList()); Review comment: nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we don't want to test it out. Having all members use the same member.id is a bit weird. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ########## @@ -51,9 +52,21 @@ if (throwable != null) { result.completeExceptionally(throwable); } else { - for (MemberToRemove memberToRemove : memberInfos) { - if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { - return; + if (removeAll()) { + for (Map.Entry<MemberIdentity, Errors> entry: memberErrors.entrySet()) { + Exception exception = entry.getValue().exception(); + if (exception != null) { + Throwable ex = new KafkaException("Encounter exception when trying to remove: " Review comment: Let's put the exception in the cause so that we could verify the cause in `KafkaAdminClientTest`, as: ``` if (exception != null) { result.completeExceptionally(new KafkaException( "Encounter exception when trying to remove: " + entry.getKey(), exception)); return; } ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ########## @@ -37,7 +38,15 @@ public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) this.members = new HashSet<>(members); } + public RemoveMembersFromConsumerGroupOptions() { + this.members = Collections.emptySet();; Review comment: nit: extra semi-colon ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -119,7 +122,9 @@ + "* This tool will not clean up the local state on the stream application instances (the persisted " + "stores used to cache aggregation results).\n" + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the " - + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n\n" + + "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n" + + "*Please use the \"--force\" option to force remove active members in case long session " Review comment: nit: space after `*`. Also I feel we could make the context more concrete by: ``` When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the \"--force\" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions. ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ########## @@ -261,6 +261,43 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } + public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { + appID = testId + "-with-force-option"; + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); Review comment: I see, this is indeed weird, please file a JIRA so that we could clean in a follow-up PR if others feel the same way. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org