lucasbru commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2126352604


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -76,8 +121,14 @@ public StreamsGroupCommandOptions(String[] args) {
             .withRequiredArg()
             .describedAs("streams group")
             .ofType(String.class);
+        topicOpt = parser.accepts("topic", TOPIC_DOC)

Review Comment:
   According to the KIP, this should be --input-topics, and allow multiple 
topics to be passed.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +405,549 @@ Map<TopicPartition, OffsetsInfo> 
getOffsets(StreamsGroupDescription description)
 
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                return adminClient.listConsumerGroupOffsets(
-                    Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+                var sourceTopics = 
adminClient.describeStreamsGroups(List.of(groupId))
+                    .all().get().get(groupId)
+                    .subtopologies().stream()
+                    .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
+                    .collect(Collectors.toSet());
+
+                var allTopicPartitions = 
adminClient.listStreamsGroupOffsets(Map.of(groupId, new 
ListStreamsGroupOffsetsSpec()))
+                    .partitionsToOffsetAndMetadata(groupId).get();
+
+                allTopicPartitions.keySet().removeIf(tp -> 
!sourceTopics.contains(tp.topic()));
+                return allTopicPartitions;
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private List<TopicPartition> filterExistingGroupTopics(String groupId, 
List<TopicPartition> topicPartitions) {
+            try {
+                var allTopicPartitions = 
adminClient.listStreamsGroupOffsets(Map.of(groupId, new 
ListStreamsGroupOffsetsSpec()))
+                    .partitionsToOffsetAndMetadata(groupId).get();
+                boolean allPresent = 
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
+                if (!allPresent) {
+                    printError("One or more topics are not part of the group 
'" + groupId + "'.", Optional.empty());
+                    return Collections.emptyList();
+                }
+                return topicPartitions;
             } catch (InterruptedException | ExecutionException e) {
                 throw new RuntimeException(e);
             }
         }
 
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+            // Dry-run is the default behavior if --execute is not specified
+            boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new 
HashMap<>();
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? listStreamsGroups()
+                : opts.options.valuesOf(opts.groupOpt);
+            if (!groupIds.isEmpty()) {
+                Map<String, KafkaFuture<StreamsGroupDescription>> 
streamsGroups = adminClient.describeStreamsGroups(
+                    groupIds
+                ).describedGroups();
+
+                streamsGroups.forEach((groupId, groupDescription) -> {
+                    try {
+                        String state = 
groupDescription.get().groupState().toString();
+                        switch (state) {
+                            case "Empty":
+                            case "Dead":
+                                // reset offsets in source topics
+                                result.put(groupId, 
resetOffsetsForInactiveGroup(groupId, dryRun));
+                                // delete internal topics
+                                if (!dryRun) {
+                                    List<String> internalTopics = 
retrieveInternalTopics(List.of(groupId)).get(groupId);
+                                    if (internalTopics != null && 
!internalTopics.isEmpty()) {
+                                        try {
+                                            
adminClient.deleteTopics(internalTopics).all().get();

Review Comment:
   Why do we delete internal topics here?



##########
tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java:
##########
@@ -179,22 +187,31 @@ public void testDescribeStreamsGroupsGetOffsets() throws 
Exception {
         
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
         
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
 
-        
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
+        
when(ADMIN_CLIENT.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
 
-        ListConsumerGroupOffsetsResult result = 
mock(ListConsumerGroupOffsetsResult.class);
+        ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
         committedOffsetsMap.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(12, Optional.of(0), ""));
 
-        
when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+        
when(ADMIN_CLIENT.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
         
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
 
+        // Java

Review Comment:
   why this comment?



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:
##########
@@ -76,8 +121,14 @@ public StreamsGroupCommandOptions(String[] args) {
             .withRequiredArg()
             .describedAs("streams group")
             .ofType(String.class);
+        topicOpt = parser.accepts("topic", TOPIC_DOC)
+            .withRequiredArg()
+            .describedAs("topic")
+            .ofType(String.class);
+        allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC);

Review Comment:
   According to the KIP, this should be --all-input-topics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to