[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429404004 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -27,10 +27,12 @@ import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions; import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.KafkaException; Review comment: This is no longer used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r42955 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -379,6 +379,22 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } +private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId, List groupInstances, + List topicPartitions) { +final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); +byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; +List describedGroupMembers = groupInstances.stream().map(groupInstance -> DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", "clientHost", memberAssignmentBytes, null)).collect(Collectors.toList()); Review comment: nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we don't want to test it out. Having all members use the same member.id is a bit weird. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -51,9 +52,21 @@ if (throwable != null) { result.completeExceptionally(throwable); } else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { +for (Map.Entry entry: memberErrors.entrySet()) { +Exception exception = entry.getValue().exception(); +if (exception != null) { +Throwable ex = new KafkaException("Encounter exception when trying to remove: " Review comment: Let's put the exception in the cause so that we could verify the cause in `KafkaAdminClientTest`, as: ``` if (exception != null) { result.completeExceptionally(new KafkaException( "Encounter exception when trying to remove: " + entry.getKey(), exception)); return; } ``` ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ## @@ -37,7 +38,15 @@ public RemoveMembersFromConsumerGroupOptions(Collection members) this.members = new HashSet<>(members); } +public RemoveMembersFromConsumerGroupOptions() { +this.members = Collections.emptySet();; Review comment: nit: extra semi-colon ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -119,7 +122,9 @@ + "* This tool will not clean up the local state on the stream application instances (the persisted " + "stores used to cache aggregation results).\n" + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the " -+ "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/ by default).\n\n" ++ "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/ by default).\n" ++ "*Please use the \"--force\" option to force remove active members in case long session " Review comment: nit: space after `*`. Also I feel we could make the context more concrete by: ``` When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the \"--force\" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions. ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -261,6 +261,43 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } +public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { +appID = testId + "-with-force-option"; +streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); +streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); Review comment: I see, this is indeed weird, please file a JIRA so that we could clean in a follow-up PR if others feel the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For
[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r428745517 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members; +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { Review comment: I think we should catch `Exception` here: https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); Review comment: Let's get back the original indentation. ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String groupId, final List members = new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { -throw new IllegalStateException("Consumer group '" + groupId + "' is still active " -+ "and has following members: " + members + ". " -+ "Make sure to stop all running application instances before running the reset tool."); +if (options.has(forceOption)) { +System.out.println("Force deleting all active members in the group: " + groupId); +adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all(); Review comment: Should we check the member removal result here before proceeding? If that call failed, the whole operation should fail with error message containing the result IMHO. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -507,7 +544,7 @@ private Topology setupTopologyWithoutIntermediateUserTopic() { return builder.build(); } -private void cleanGlobal(final boolean withIntermediateTopics, +private int tryCleanGlobal(final boolean withIntermediateTopics, Review comment: We could add meta comment for the return value here, and instead of returning an exit code, I feel a boolean is suffice to indicate whether the clean operation was successful or not. ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws Exception { assertNull(noErrorResult.all().get()); assertNull(noErrorResult.memberResult(memberOne).get()); assertNull(noErrorResult.memberResult(memberTwo).get()); + +// Return with success for "removeAll" scenario Review comment: This test looks good, but it seems that we didn't test the case where some members get deleted successfully while some are not? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +List members; +if (options.removeAll()) { +members = getMembersFromGroup(groupId); +} else { +members = options.members().stream().map( + MemberToRemove::toMemberIdentity).collect(Collectors.toList()); +} Call findCoordinatorCall = getFindCoordinatorCall(context, -() -> getRemoveMembersFromGroupCall(context)); +() -> getRemoveMembersFromGroupCall(context, members)); runnable.call(findCoordinatorCall, startFindCoordinatorMs); return new RemoveMembersFromConsumerGroupResult(future, options.members()); } -private Call
[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r418108095 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3641,37 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); -Call findCoordinatorCall = getFindCoordinatorCall(context, -() -> getRemoveMembersFromGroupCall(context)); +Call findCoordinatorCall; +if (options.removeAll()) { +List members = getMembersFromGroup(groupId); +findCoordinatorCall = getFindCoordinatorCall(context, +() -> getRemoveMembersFromGroupCall(context, members)); Review comment: could we pass the members into the context? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ## @@ -32,12 +32,23 @@ public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions { private Set members; Review comment: Could we just make members to be `Optional>` so that we don't need a separate removeAll parameter? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); +ex.printStackTrace(); +} + +List memberToRemove = new ArrayList<>(); +for (MemberDescription member: members) { Review comment: style error here. I would recommend doing a self style check like: `./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest spotbugsScoverage compileTestJava` otherwise we still need to fix those failures after we do jenkins build. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); Review comment: Remove print statements ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members = new ArrayList<>(); +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +System.out.println("Encounter exception when trying to get members from group: " + groupId); +ex.printStackTrace(); Review comment: Curious why we are still continuing in this case, as the member lookup already fails. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org