lucasbru commented on code in PR #19895: URL: https://github.com/apache/kafka/pull/19895#discussion_r2132131515
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -183,12 +197,127 @@ public void describeGroups() throws ExecutionException, InterruptedException { } } + Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) { + Map<TopicPartition, Throwable> partitionLevelResult = new HashMap<>(); + Set<String> topicWithPartitions = new HashSet<>(); + Set<String> topicWithoutPartitions = new HashSet<>(); + + for (String topic : topics) { + if (topic.contains(":")) + topicWithPartitions.add(topic); + else + topicWithoutPartitions.add(topic); + } + + List<TopicPartition> knownPartitions = topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList()); + + // Get the partitions of topics that the user did not explicitly specify the partitions + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics( + topicWithoutPartitions, + withTimeoutMs(new DescribeTopicsOptions())); + + Iterator<TopicPartition> unknownPartitions = describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> { Review Comment: can we rename this varialbe to something like `unspecifiedTopicPartitions` and `specifiedTopicPartition` (above)? This is very confusing. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -192,12 +205,327 @@ public void describeGroups() throws ExecutionException, InterruptedException { } } + Map<String, Throwable> deleteGroups() { + List<String> groupIds = opts.options.has(opts.allGroupsOpt) + ? new ArrayList<>(listStreamsGroups()) + : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + + // pre admin call checks + Map<String, Throwable> failed = preAdminCallChecks(groupIds); + + groupIds.removeAll(failed.keySet()); + Map<String, Throwable> success = new HashMap<>(); + Map<String, List<String>> internalTopics = new HashMap<>(); + Map<String, Throwable> internalTopicsDeletionFailures = new HashMap<>(); + if (!groupIds.isEmpty()) { + // retrieve internal topics before deleting groups + internalTopics = retrieveInternalTopics(groupIds); + + // delete streams groups + Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteStreamsGroups( + groupIds, + withTimeoutMs(new DeleteStreamsGroupsOptions()) + ).deletedGroups(); + + groupsToDelete.forEach((g, f) -> { + try { + f.get(); + success.put(g, null); + } catch (InterruptedException ie) { + failed.put(g, ie); + } catch (ExecutionException e) { + failed.put(g, e.getCause()); + } + }); + + // delete internal topics + if (!success.isEmpty()) { + for (String groupId : success.keySet()) { + List<String> internalTopicsToDelete = internalTopics.get(groupId); + if (internalTopicsToDelete != null && !internalTopicsToDelete.isEmpty()) { + DeleteTopicsResult deleteTopicsResult = null; + try { + deleteTopicsResult = adminClient.deleteTopics(internalTopicsToDelete); + deleteTopicsResult.all().get(); + } catch (InterruptedException | ExecutionException e) { + if (deleteTopicsResult != null) { + deleteTopicsResult.topicNameValues().forEach((topic, future) -> { + try { + future.get(); + } catch (Exception topicException) { + System.out.println("Failed to delete internal topic: " + topic); + } + }); + } + internalTopicsDeletionFailures.put(groupId, e.getCause()); + } + } + } + } + } + // display outcome messages based on the results + if (failed.isEmpty()) { + System.out.println("Deletion of requested streams groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "') was successful."); + } else { + printError("Deletion of some streams groups failed:", Optional.empty()); + failed.forEach((group, error) -> System.out.println("* Group '" + group + "' could not be deleted due to: " + error)); + + if (!success.isEmpty()) { + System.out.println("\nThese streams groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("', '")) + "'."); + } + } + if (!internalTopics.keySet().isEmpty()) { + printInternalTopicErrors(internalTopicsDeletionFailures, success.keySet(), internalTopics.keySet()); + } + // for testing purpose: return all failures, including internal topics deletion failures + failed.putAll(success); + failed.putAll(internalTopicsDeletionFailures); + return failed; + } + + private Map<String, Throwable> preAdminCallChecks(List<String> groupIds) { + List<GroupListing> streamsGroupIds = listDetailedStreamsGroups(); + LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds); + + Map<String, Throwable> failed = new HashMap<>(); + + for (String groupId : groupIdSet) { + Optional<GroupListing> listing = streamsGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny(); + if (listing.isEmpty()) { + failed.put(groupId, new IllegalArgumentException("Group '" + groupId + "' does not exist or is not a streams group.")); + } else { + Optional<GroupState> groupState = listing.get().groupState(); + groupState.ifPresent(state -> { + if (state == GroupState.DEAD) { + failed.put(groupId, new IllegalStateException("Streams group '" + groupId + "' group state is DEAD.")); + } else if (state != GroupState.EMPTY) { + failed.put(groupId, new GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY.")); + } + }); + } + } + return failed; + } + + // Visibility for testing + Map<String, List<String>> retrieveInternalTopics(List<String> groupIds) { + Map<String, List<String>> groupToInternalTopics = new HashMap<>(); + try { + Map<String, StreamsGroupDescription> descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get(); + for (StreamsGroupDescription description : descriptionMap.values()) { + + List<String> sourceTopics = description.subtopologies().stream() + .flatMap(subtopology -> subtopology.sourceTopics().stream()).toList(); + + List<String> internalTopics = description.subtopologies().stream() + .flatMap(subtopology -> Stream.concat( + subtopology.repartitionSourceTopics().keySet().stream(), + subtopology.stateChangelogTopics().keySet().stream())) + .filter(topic -> !sourceTopics.contains(topic)) + .collect(Collectors.toList()); + internalTopics.removeIf(topic -> { + if (!isInferredInternalTopic(topic, description.groupId())) { + printError("The internal topic '" + topic + "' is not inferred as internal " + + "and thus will not be deleted with the group '" + description.groupId() + "'.", Optional.empty()); + return true; + } + return false; + }); + if (!internalTopics.isEmpty()) { + groupToInternalTopics.put(description.groupId(), internalTopics); + } + } + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof UnsupportedVersionException) { + printError("Retrieving internal topics is not supported by the broker version. " + + "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + } else { + printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e)); + } + } + return groupToInternalTopics; + } + + private boolean isInferredInternalTopic(final String topicName, final String applicationId) { + return topicName.startsWith(applicationId + "-") && matchesInternalTopicFormat(topicName); + } + + public static boolean matchesInternalTopicFormat(final String topicName) { + return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") + || topicName.endsWith("-subscription-registration-topic") + || topicName.endsWith("-subscription-response-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic"); + } + + + private void printInternalTopicErrors(Map<String, Throwable> internalTopicsDeletionFailures, + Set<String> deletedGroupIds, + Set<String> groupIdsWithInternalTopics) { + if (!deletedGroupIds.isEmpty()) { + if (internalTopicsDeletionFailures.isEmpty()) { + List<String> successfulGroups = deletedGroupIds.stream() + .filter(groupIdsWithInternalTopics::contains) + .collect(Collectors.toList()); + System.out.println("Deletion of associated internal topics of the streams groups ('" + + String.join("', '", successfulGroups) + "') was successful."); + } else { + System.out.println("Deletion of some associated internal topics failed:"); + internalTopicsDeletionFailures.forEach((group, error) -> + System.out.println("* Internal topics of the streams group '" + group + "' could not be deleted due to: " + error)); + } + } + } + + List<GroupListing> listDetailedStreamsGroups() { + try { + ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) + .withTypes(Set.of(GroupType.STREAMS))); + Collection<GroupListing> listings = result.all().get(); + return listings.stream().toList(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) { + Map<TopicPartition, Throwable> partitionLevelResult = new HashMap<>(); + Set<String> topicWithPartitions = new HashSet<>(); + Set<String> topicWithoutPartitions = new HashSet<>(); + + for (String topic : topics) { + if (topic.contains(":")) + topicWithPartitions.add(topic); + else + topicWithoutPartitions.add(topic); + } + + List<TopicPartition> knownPartitions = topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList(); + + // Get the partitions of topics that the user did not explicitly specify the partitions + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics( + topicWithoutPartitions, + withTimeoutMs(new DescribeTopicsOptions())); + + Iterator<TopicPartition> unknownPartitions = describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> { + String topic = e.getKey(); + try { + return e.getValue().get().partitions().stream().map(partition -> + new TopicPartition(topic, partition.partition())); + } catch (ExecutionException | InterruptedException err) { + partitionLevelResult.put(new TopicPartition(topic, -1), err); + return Stream.empty(); + } + }).iterator(); + + Set<TopicPartition> partitions = new HashSet<>(knownPartitions); + + unknownPartitions.forEachRemaining(partitions::add); + + return deleteOffsets(groupId, partitions, partitionLevelResult); + } + + private Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, Set<TopicPartition> partitions, Map<TopicPartition, Throwable> partitionLevelResult) { + + DeleteStreamsGroupOffsetsResult deleteResult = adminClient.deleteStreamsGroupOffsets( + groupId, + partitions, + withTimeoutMs(new DeleteStreamsGroupOffsetsOptions()) + ); + + Errors topLevelException = Errors.NONE; + + try { + deleteResult.all().get(); + } catch (ExecutionException | InterruptedException e) { + topLevelException = Errors.forException(e.getCause()); + } + + partitions.forEach(partition -> { + try { + deleteResult.partitionResult(partition).get(); + partitionLevelResult.put(partition, null); + } catch (ExecutionException | InterruptedException e) { + partitionLevelResult.put(partition, e); + } + }); + + return new AbstractMap.SimpleImmutableEntry<>(topLevelException, partitionLevelResult); + } + + Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() { + String groupId = opts.options.valueOf(opts.groupOpt); + Map.Entry<Errors, Map<TopicPartition, Throwable>> res; + if (opts.options.has(opts.allInputTopicsOpt)) { + Set<TopicPartition> partitions = getCommittedOffsets(groupId).keySet(); + res = deleteOffsets(groupId, partitions, new HashMap<>()); + } else if (opts.options.has(opts.inputTopicOpt)) { + List<String> topics = opts.options.valuesOf(opts.inputTopicOpt); + res = deleteOffsets(groupId, topics); + } else { + CommandLineUtils.printUsageAndExit(opts.parser, "Option --delete-offsets requires either --all-topics or --topic to be specified."); Review Comment: Can you make this `--all-input-topics`? ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -326,10 +469,40 @@ Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description) return output; } + private Stream<TopicPartition> parseTopicsWithPartitions(String topicArg) { + ToIntFunction<String> partitionNum = partition -> { + try { + return Integer.parseInt(partition); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid partition '" + partition + "' specified in topic arg '" + topicArg + "''"); Review Comment: yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org