lucasbru commented on code in PR #19895:
URL: https://github.com/apache/kafka/pull/19895#discussion_r2132131515


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -183,12 +197,127 @@ public void describeGroups() throws ExecutionException, 
InterruptedException {
             }
         }
 
+        Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String 
groupId, List<String> topics) {
+            Map<TopicPartition, Throwable> partitionLevelResult = new 
HashMap<>();
+            Set<String> topicWithPartitions = new HashSet<>();
+            Set<String> topicWithoutPartitions = new HashSet<>();
+
+            for (String topic : topics) {
+                if (topic.contains(":"))
+                    topicWithPartitions.add(topic);
+                else
+                    topicWithoutPartitions.add(topic);
+            }
+
+            List<TopicPartition> knownPartitions = 
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList());
+
+            // Get the partitions of topics that the user did not explicitly 
specify the partitions
+            DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(
+                topicWithoutPartitions,
+                withTimeoutMs(new DescribeTopicsOptions()));
+
+            Iterator<TopicPartition> unknownPartitions = 
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {

Review Comment:
   can we rename this varialbe to something like `unspecifiedTopicPartitions` 
and `specifiedTopicPartition` (above)? This is very confusing.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -192,12 +205,327 @@ public void describeGroups() throws ExecutionException, 
InterruptedException {
             }
         }
 
+        Map<String, Throwable> deleteGroups() {
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? new ArrayList<>(listStreamsGroups())
+                : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+
+            // pre admin call checks
+            Map<String, Throwable> failed = preAdminCallChecks(groupIds);
+
+            groupIds.removeAll(failed.keySet());
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, List<String>> internalTopics = new HashMap<>();
+            Map<String, Throwable> internalTopicsDeletionFailures = new 
HashMap<>();
+            if (!groupIds.isEmpty()) {
+                // retrieve internal topics before deleting groups
+                internalTopics = retrieveInternalTopics(groupIds);
+
+                // delete streams groups
+                Map<String, KafkaFuture<Void>> groupsToDelete = 
adminClient.deleteStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DeleteStreamsGroupsOptions())
+                ).deletedGroups();
+
+                groupsToDelete.forEach((g, f) -> {
+                    try {
+                        f.get();
+                        success.put(g, null);
+                    } catch (InterruptedException ie) {
+                        failed.put(g, ie);
+                    } catch (ExecutionException e) {
+                        failed.put(g, e.getCause());
+                    }
+                });
+
+                // delete internal topics
+                if (!success.isEmpty()) {
+                    for (String groupId : success.keySet()) {
+                        List<String> internalTopicsToDelete = 
internalTopics.get(groupId);
+                        if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
+                            DeleteTopicsResult deleteTopicsResult = null;
+                            try {
+                                deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                                deleteTopicsResult.all().get();
+                            } catch (InterruptedException | ExecutionException 
e) {
+                                if (deleteTopicsResult != null) {
+                                    
deleteTopicsResult.topicNameValues().forEach((topic, future) -> {
+                                        try {
+                                            future.get();
+                                        } catch (Exception topicException) {
+                                            System.out.println("Failed to 
delete internal topic: " + topic);
+                                        }
+                                    });
+                                }
+                                internalTopicsDeletionFailures.put(groupId, 
e.getCause());
+                            }
+                        }
+                    }
+                }
+            }
+            // display outcome messages based on the results
+            if (failed.isEmpty()) {
+                System.out.println("Deletion of requested streams groups (" + 
"'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "') was successful.");
+            } else {
+                printError("Deletion of some streams groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Group 
'" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty()) {
+                    System.out.println("\nThese streams groups were deleted 
successfully: " + "'" + 
success.keySet().stream().map(Object::toString).collect(Collectors.joining("', 
'")) + "'.");
+                }
+            }
+            if (!internalTopics.keySet().isEmpty()) {
+                printInternalTopicErrors(internalTopicsDeletionFailures, 
success.keySet(), internalTopics.keySet());
+            }
+            // for testing purpose: return all failures, including internal 
topics deletion failures
+            failed.putAll(success);
+            failed.putAll(internalTopicsDeletionFailures);
+            return failed;
+        }
+
+        private Map<String, Throwable> preAdminCallChecks(List<String> 
groupIds) {
+            List<GroupListing> streamsGroupIds = listDetailedStreamsGroups();
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+
+            Map<String, Throwable> failed = new HashMap<>();
+
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
streamsGroupIds.stream().filter(item -> 
item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    failed.put(groupId, new IllegalArgumentException("Group '" 
+ groupId + "' does not exist or is not a streams group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            failed.put(groupId, new 
IllegalStateException("Streams group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            failed.put(groupId, new 
GroupNotEmptyException("Streams group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+            return failed;
+        }
+
+        // Visibility for testing
+        Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
+            Map<String, List<String>> groupToInternalTopics = new HashMap<>();
+            try {
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                for (StreamsGroupDescription description : 
descriptionMap.values()) {
+
+                    List<String> sourceTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> 
subtopology.sourceTopics().stream()).toList();
+
+                    List<String> internalTopics = 
description.subtopologies().stream()
+                        .flatMap(subtopology -> Stream.concat(
+                            
subtopology.repartitionSourceTopics().keySet().stream(),
+                            
subtopology.stateChangelogTopics().keySet().stream()))
+                        .filter(topic -> !sourceTopics.contains(topic))
+                        .collect(Collectors.toList());
+                    internalTopics.removeIf(topic -> {
+                        if (!isInferredInternalTopic(topic, 
description.groupId())) {
+                            printError("The internal topic '" + topic + "' is 
not inferred as internal " +
+                                "and thus will not be deleted with the group 
'" + description.groupId() + "'.", Optional.empty());
+                            return true;
+                        }
+                        return false;
+                    });
+                    if (!internalTopics.isEmpty()) {
+                        groupToInternalTopics.put(description.groupId(), 
internalTopics);
+                    }
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                if (e.getCause() instanceof UnsupportedVersionException) {
+                    printError("Retrieving internal topics is not supported by 
the broker version. " +
+                        "Use 'kafka-topics.sh' to list and delete the group's 
internal topics.", Optional.of(e.getCause()));
+                } else {
+                    printError("Retrieving internal topics failed due to " + 
e.getMessage(), Optional.of(e));
+                }
+            }
+            return groupToInternalTopics;
+        }
+
+        private boolean isInferredInternalTopic(final String topicName, final 
String applicationId) {
+            return topicName.startsWith(applicationId + "-") && 
matchesInternalTopicFormat(topicName);
+        }
+
+        public static boolean matchesInternalTopicFormat(final String 
topicName) {
+            return topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
+                || topicName.endsWith("-subscription-registration-topic")
+                || topicName.endsWith("-subscription-response-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
+                || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
+        }
+
+
+        private void printInternalTopicErrors(Map<String, Throwable> 
internalTopicsDeletionFailures,
+                                              Set<String> deletedGroupIds,
+                                              Set<String> 
groupIdsWithInternalTopics) {
+            if (!deletedGroupIds.isEmpty()) {
+                if (internalTopicsDeletionFailures.isEmpty()) {
+                    List<String> successfulGroups = deletedGroupIds.stream()
+                        .filter(groupIdsWithInternalTopics::contains)
+                        .collect(Collectors.toList());
+                    System.out.println("Deletion of associated internal topics 
of the streams groups ('" +
+                        String.join("', '", successfulGroups) + "') was 
successful.");
+                } else {
+                    System.out.println("Deletion of some associated internal 
topics failed:");
+                    internalTopicsDeletionFailures.forEach((group, error) ->
+                        System.out.println("* Internal topics of the streams 
group '" + group + "' could not be deleted due to: " + error));
+                }
+            }
+        }
+
+        List<GroupListing> listDetailedStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private Map.Entry<Errors, Map<TopicPartition, Throwable>> 
deleteOffsets(String groupId, List<String> topics) {
+            Map<TopicPartition, Throwable> partitionLevelResult = new 
HashMap<>();
+            Set<String> topicWithPartitions = new HashSet<>();
+            Set<String> topicWithoutPartitions = new HashSet<>();
+
+            for (String topic : topics) {
+                if (topic.contains(":"))
+                    topicWithPartitions.add(topic);
+                else
+                    topicWithoutPartitions.add(topic);
+            }
+
+            List<TopicPartition> knownPartitions = 
topicWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).toList();
+
+            // Get the partitions of topics that the user did not explicitly 
specify the partitions
+            DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(
+                topicWithoutPartitions,
+                withTimeoutMs(new DescribeTopicsOptions()));
+
+            Iterator<TopicPartition> unknownPartitions = 
describeTopicsResult.topicNameValues().entrySet().stream().flatMap(e -> {
+                String topic = e.getKey();
+                try {
+                    return 
e.getValue().get().partitions().stream().map(partition ->
+                        new TopicPartition(topic, partition.partition()));
+                } catch (ExecutionException | InterruptedException err) {
+                    partitionLevelResult.put(new TopicPartition(topic, -1), 
err);
+                    return Stream.empty();
+                }
+            }).iterator();
+
+            Set<TopicPartition> partitions = new HashSet<>(knownPartitions);
+
+            unknownPartitions.forEachRemaining(partitions::add);
+
+            return deleteOffsets(groupId, partitions, partitionLevelResult);
+        }
+
+        private Map.Entry<Errors, Map<TopicPartition, Throwable>> 
deleteOffsets(String groupId, Set<TopicPartition> partitions, 
Map<TopicPartition, Throwable> partitionLevelResult) {
+
+            DeleteStreamsGroupOffsetsResult deleteResult = 
adminClient.deleteStreamsGroupOffsets(
+                groupId,
+                partitions,
+                withTimeoutMs(new DeleteStreamsGroupOffsetsOptions())
+            );
+
+            Errors topLevelException = Errors.NONE;
+
+            try {
+                deleteResult.all().get();
+            } catch (ExecutionException | InterruptedException e) {
+                topLevelException = Errors.forException(e.getCause());
+            }
+
+            partitions.forEach(partition -> {
+                try {
+                    deleteResult.partitionResult(partition).get();
+                    partitionLevelResult.put(partition, null);
+                } catch (ExecutionException | InterruptedException e) {
+                    partitionLevelResult.put(partition, e);
+                }
+            });
+
+            return new AbstractMap.SimpleImmutableEntry<>(topLevelException, 
partitionLevelResult);
+        }
+
+        Map.Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            Map.Entry<Errors, Map<TopicPartition, Throwable>> res;
+            if (opts.options.has(opts.allInputTopicsOpt)) {
+                Set<TopicPartition> partitions = 
getCommittedOffsets(groupId).keySet();
+                res = deleteOffsets(groupId, partitions, new HashMap<>());
+            } else if (opts.options.has(opts.inputTopicOpt)) {
+                List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
+                res = deleteOffsets(groupId, topics);
+            } else {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Option 
--delete-offsets requires either --all-topics or --topic to be specified.");

Review Comment:
   Can you make this `--all-input-topics`?



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -326,10 +469,40 @@ Map<TopicPartition, OffsetsInfo> 
getOffsets(StreamsGroupDescription description)
             return output;
         }
 
+        private Stream<TopicPartition> parseTopicsWithPartitions(String 
topicArg) {
+            ToIntFunction<String> partitionNum = partition -> {
+                try {
+                    return Integer.parseInt(partition);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid partition '" + 
partition + "' specified in topic arg '" + topicArg + "''");

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to