Nikita-Shupletsov commented on code in PR #21074:
URL: https://github.com/apache/kafka/pull/21074#discussion_r2590048590
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -177,30 +181,42 @@ public int execute(final String[] args, final Properties
config) {
}
}
- private void maybeDeleteActiveConsumers(final String groupId,
- final Admin adminClient,
- final StreamsResetterOptions
options)
+ // visible for testing
+ void maybeDeleteActiveConsumers(final String groupId,
+ final Admin adminClient,
+ final boolean force)
throws ExecutionException, InterruptedException {
- final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
- Set.of(groupId),
- new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
- try {
- final List<MemberDescription> members =
- new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
- if (!members.isEmpty()) {
- if (options.hasForce()) {
- System.out.println("Force deleting all active members in
the group: " + groupId);
- adminClient.removeMembersFromConsumerGroup(groupId, new
RemoveMembersFromConsumerGroupOptions()).all().get();
- } else {
- throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
- + "and has following members: " + members + ". "
- + "Make sure to stop all running application instances
before running the reset tool."
- + " You can use option '--force' to remove active
members from the group.");
+ int retries = 0;
+ while (true) {
+ final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
+ Set.of(groupId),
+ new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+ try {
+ final List<MemberDescription> members =
+ new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+ if (!members.isEmpty()) {
+ if (force) {
+ System.out.println("Force deleting all active members
in the group: " + groupId);
+ adminClient.removeMembersFromConsumerGroup(groupId,
new RemoveMembersFromConsumerGroupOptions()).all().get();
+ } else {
+ throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
+ + "and has following members: " + members + ".
"
+ + "Make sure to stop all running application
instances before running the reset tool."
+ + " You can use option '--force' to remove
active members from the group.");
+ }
+ }
+ break;
+ } catch (ExecutionException ee) {
+ // If the group ID is not found, this is not an error case
+ if (ee.getCause() instanceof GroupIdNotFoundException) {
+ break;
+ }
+ // if a member is unknown, it may mean that it left the group
itself. Retrying to confirm.
Review Comment:
removeMembersFromConsumerGroup fetches all members in the group and sends a
request to delete them. if any of them fail,
RemoveMembersFromConsumerGroupResult#all will throw an exception.
so, effectively, broker first said something existed, but then said it
doesn't.
if we retry, we will repeat the same actions again. and at this point it's
not expected to receive the same member(because the broker said it doesn't
exit), that failed to get deleted as it was unknown. so even if we skipped some
deletion for whatever reason a retry will
##########
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##########
@@ -177,30 +181,42 @@ public int execute(final String[] args, final Properties
config) {
}
}
- private void maybeDeleteActiveConsumers(final String groupId,
- final Admin adminClient,
- final StreamsResetterOptions
options)
+ // visible for testing
+ void maybeDeleteActiveConsumers(final String groupId,
+ final Admin adminClient,
+ final boolean force)
throws ExecutionException, InterruptedException {
- final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
- Set.of(groupId),
- new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
- try {
- final List<MemberDescription> members =
- new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
- if (!members.isEmpty()) {
- if (options.hasForce()) {
- System.out.println("Force deleting all active members in
the group: " + groupId);
- adminClient.removeMembersFromConsumerGroup(groupId, new
RemoveMembersFromConsumerGroupOptions()).all().get();
- } else {
- throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
- + "and has following members: " + members + ". "
- + "Make sure to stop all running application instances
before running the reset tool."
- + " You can use option '--force' to remove active
members from the group.");
+ int retries = 0;
+ while (true) {
+ final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
+ Set.of(groupId),
+ new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+ try {
+ final List<MemberDescription> members =
+ new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+ if (!members.isEmpty()) {
+ if (force) {
+ System.out.println("Force deleting all active members
in the group: " + groupId);
+ adminClient.removeMembersFromConsumerGroup(groupId,
new RemoveMembersFromConsumerGroupOptions()).all().get();
+ } else {
+ throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
+ + "and has following members: " + members + ".
"
+ + "Make sure to stop all running application
instances before running the reset tool."
+ + " You can use option '--force' to remove
active members from the group.");
+ }
+ }
+ break;
+ } catch (ExecutionException ee) {
+ // If the group ID is not found, this is not an error case
+ if (ee.getCause() instanceof GroupIdNotFoundException) {
+ break;
+ }
+ // if a member is unknown, it may mean that it left the group
itself. Retrying to confirm.
Review Comment:
removeMembersFromConsumerGroup fetches all members in the group and sends a
request to delete them. if any of them fail,
RemoveMembersFromConsumerGroupResult#all will throw an exception.
so, effectively, broker first said something existed, but then said it
doesn't.
if we retry, we will repeat the same actions again. and at this point it's
not expected to receive the same member(because the broker said it doesn't
exit), that failed to get deleted as it was unknown. so even if we skipped some
deletion for whatever reason a retry will try to delete it again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]