lucasbru commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2108855809


##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +403,433 @@ Map<TopicPartition, OffsetsInfo> 
getOffsets(StreamsGroupDescription description)
 
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                return adminClient.listConsumerGroupOffsets(
-                    Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+                return adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new 
ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new 
HashMap<>();
+            List<String> groupIds = listStreamsGroups();
+            if (!groupIds.isEmpty()) {
+                Map<String, KafkaFuture<StreamsGroupDescription>> 
streamsGroups = adminClient.describeStreamsGroups(
+                    groupIds
+                ).describedGroups();
+
+                streamsGroups.forEach((groupId, groupDescription) -> {
+                    try {
+                        String state = 
groupDescription.get().groupState().toString();
+                        switch (state) {
+                            case "Empty":
+                            case "Dead":
+                                result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                                break;
+                            default:
+                                printError("Assignments can only be reset if 
the group '" + groupId + "' is inactive, but the current state is " + state + 
".", Optional.empty());
+                                result.put(groupId, Collections.emptyMap());
+                        }
+                    } catch (InterruptedException ie) {
+                        throw new RuntimeException(ie);
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof GroupIdNotFoundException) 
{
+                            result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                        } else {
+                            throw new RuntimeException(ee);
+                        }
+                    }
+                });
+            }
+            return result;
+        }
+
+        private Map<TopicPartition, OffsetAndMetadata> 
resetOffsetsForInactiveGroup(String groupId) {
+            try {
+                Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
+                Map<TopicPartition, OffsetAndMetadata> preparedOffsets = 
prepareOffsetsToReset(groupId, partitionsToReset);
+
+                // Dry-run is the default behavior if --execute is not 
specified
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterStreamsGroupOffsets(
+                        groupId,
+                        preparedOffsets
+                    ).all().get();
+                }
+
+                return preparedOffsets;
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    throw (KafkaException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        private Collection<TopicPartition> getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
+            if (opts.options.has(opts.allTopicsOpt)) {
+                return getCommittedOffsets(groupId).keySet();

Review Comment:
   I think this may not be the correct behavior. We only want to reset the 
offset of source topics, never the offset of repartition topics (which are also 
committed). For a clean reset, as implemented by the streams resetter, I think 
we want to delete the internal topics (through the other options). Resetting 
repartition topics to the beginning seems incorrect, since it will mean that we 
will reconsume these records in the downstream subtopologies.
   
   @bbejeck Wdyt?
   
   We are already describing the streams group, so I think this shouldn't be 
too hard to implement - we have to pass the source topics into this function.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +403,433 @@ Map<TopicPartition, OffsetsInfo> 
getOffsets(StreamsGroupDescription description)
 
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                return adminClient.listConsumerGroupOffsets(
-                    Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+                return adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new 
ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new 
HashMap<>();
+            List<String> groupIds = listStreamsGroups();
+            if (!groupIds.isEmpty()) {
+                Map<String, KafkaFuture<StreamsGroupDescription>> 
streamsGroups = adminClient.describeStreamsGroups(
+                    groupIds
+                ).describedGroups();
+
+                streamsGroups.forEach((groupId, groupDescription) -> {
+                    try {
+                        String state = 
groupDescription.get().groupState().toString();
+                        switch (state) {
+                            case "Empty":
+                            case "Dead":
+                                result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                                break;
+                            default:
+                                printError("Assignments can only be reset if 
the group '" + groupId + "' is inactive, but the current state is " + state + 
".", Optional.empty());
+                                result.put(groupId, Collections.emptyMap());
+                        }
+                    } catch (InterruptedException ie) {
+                        throw new RuntimeException(ie);
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof GroupIdNotFoundException) 
{
+                            result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                        } else {
+                            throw new RuntimeException(ee);
+                        }
+                    }
+                });
+            }
+            return result;
+        }
+
+        private Map<TopicPartition, OffsetAndMetadata> 
resetOffsetsForInactiveGroup(String groupId) {
+            try {
+                Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
+                Map<TopicPartition, OffsetAndMetadata> preparedOffsets = 
prepareOffsetsToReset(groupId, partitionsToReset);
+
+                // Dry-run is the default behavior if --execute is not 
specified
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterStreamsGroupOffsets(
+                        groupId,
+                        preparedOffsets
+                    ).all().get();
+                }
+
+                return preparedOffsets;
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    throw (KafkaException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        private Collection<TopicPartition> getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
+            if (opts.options.has(opts.allTopicsOpt)) {
+                return getCommittedOffsets(groupId).keySet();
+            } else if (opts.options.has(opts.topicOpt)) {
+                List<String> topics = opts.options.valuesOf(opts.topicOpt);
+                return parseTopicPartitionsToReset(topics);
+            } else {
+                if (!opts.options.has(opts.resetFromFileOpt))
+                    CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
+
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> parseTopicPartitionsToReset(List<String> 
topicArgs) throws ExecutionException, InterruptedException {
+            List<String> topicsWithPartitions = new ArrayList<>();
+            List<String> topics = new ArrayList<>();
+
+            topicArgs.forEach(topicArg -> {
+                if (topicArg.contains(TOPIC_PARTITION_SEPARATOR))
+                    topicsWithPartitions.add(topicArg);
+                else
+                    topics.add(topicArg);
+            });
+
+            List<TopicPartition> specifiedPartitions =
+                
topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList());
+
+            List<TopicPartition> unspecifiedPartitions = new ArrayList<>();
+
+            if (!topics.isEmpty()) {
+                Map<String, TopicDescription> descriptionMap = 
adminClient.describeTopics(
+                    topics
+                ).allTopicNames().get();
+
+                descriptionMap.forEach((topic, description) ->
+                    description.partitions().forEach(tpInfo -> 
unspecifiedPartitions.add(new TopicPartition(topic, tpInfo.partition())))
+                );
+            }
+
+            specifiedPartitions.addAll(unspecifiedPartitions);
+
+            return specifiedPartitions;
+        }
+
+        private Stream<TopicPartition> parseTopicsWithPartitions(String 
topicArg) {
+            ToIntFunction<String> partitionNum = partition -> {
+                try {
+                    return Integer.parseInt(partition);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid partition '" + 
partition + "' specified in topic arg '" + topicArg + "''");
+                }
+            };
+
+            String[] arr = topicArg.split(":");
+
+            if (arr.length != 2)
+                throw new IllegalArgumentException("Invalid topic arg '" + 
topicArg + "', expected topic name and partitions");
+
+            String topic = arr[0];
+            String partitions = arr[1];
+
+            return Arrays.stream(partitions.split(",")).
+                map(partition -> new TopicPartition(topic, 
partitionNum.applyAsInt(partition)));
+        }
+
+        @SuppressWarnings("CyclomaticComplexity")
+        private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
+            if (opts.options.has(opts.resetToOffsetOpt)) {
+                long offset = opts.options.valueOf(opts.resetToOffsetOpt);
+                return 
checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
 tp -> offset)))
+                    
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+            } else if (opts.options.has(opts.resetToEarliestOpt)) {
+                Map<TopicPartition, LogOffsetResult> logStartOffsets = 
getLogStartOffsets(partitionsToReset);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logOffsetResult = 
logStartOffsets.get(topicPartition);
+
+                    if (!(logOffsetResult instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting starting offset of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logOffsetResult).value);
+                }));
+            } else if (opts.options.has(opts.resetToLatestOpt)) {
+                Map<TopicPartition, LogOffsetResult> logEndOffsets = 
getLogEndOffsets(partitionsToReset);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logOffsetResult = 
logEndOffsets.get(topicPartition);
+
+                    if (!(logOffsetResult instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting ending offset of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logOffsetResult).value);
+                }));
+            } else if (opts.options.has(opts.resetShiftByOpt)) {
+                Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
+                Map<TopicPartition, Long> requestedOffsets = 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    long shiftBy = opts.options.valueOf(opts.resetShiftByOpt);
+                    OffsetAndMetadata currentOffset = 
currentCommittedOffsets.get(topicPartition);
+
+                    if (currentOffset == null) {
+                        throw new IllegalArgumentException("Cannot shift 
offset for partition " + topicPartition + " since there is no current committed 
offset");
+                    }
+
+                    return currentOffset.offset() + shiftBy;
+                }));
+                return checkOffsetsRange(requestedOffsets).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+            } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+                try {
+                    long timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt));
+                    Map<TopicPartition, LogOffsetResult> logTimestampOffsets = 
getLogTimestampOffsets(partitionsToReset, timestamp);
+                    return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                        LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
+
+                        if (!(logTimestampOffset instanceof LogOffset)) {
+                            CommandLineUtils.printUsageAndExit(opts.parser, 
"Error getting offset by timestamp of topic partition: " + topicPartition);
+                        }
+
+                        return new OffsetAndMetadata(((LogOffset) 
logTimestampOffset).value);
+                    }));
+                } catch (ParseException e) {
+                    throw new RuntimeException(e);
+                }
+            } else if (opts.options.has(opts.resetByDurationOpt)) {
+                String duration = 
opts.options.valueOf(opts.resetByDurationOpt);
+                Duration durationParsed = Duration.parse(duration);
+                Instant now = Instant.now();
+                durationParsed.negated().addTo(now);
+                long timestamp = now.minus(durationParsed).toEpochMilli();
+                Map<TopicPartition, LogOffsetResult> logTimestampOffsets = 
getLogTimestampOffsets(partitionsToReset, timestamp);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
+
+                    if (!(logTimestampOffset instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting offset by timestamp of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logTimestampOffset).value);
+                }));
+            } else if (resetPlanFromFile().isPresent()) {
+                return resetPlanFromFile().map(resetPlan -> {
+                    Map<TopicPartition, OffsetAndMetadata> resetPlanForGroup = 
resetPlan.get(groupId);
+
+                    if (resetPlanForGroup == null) {
+                        printError("No reset plan for group " + groupId + " 
found", Optional.empty());
+                        return Collections.<TopicPartition, 
OffsetAndMetadata>emptyMap();
+                    }
+
+                    Map<TopicPartition, Long> requestedOffsets = 
resetPlanForGroup.keySet().stream().collect(Collectors.toMap(
+                        Function.identity(),
+                        topicPartition -> 
resetPlanForGroup.get(topicPartition).offset()));
+
+                    return 
checkOffsetsRange(requestedOffsets).entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+                }).orElseGet(Collections::emptyMap);
+            } else if (opts.options.has(opts.resetToCurrentOpt)) {
+                Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
+                Collection<TopicPartition> 
partitionsToResetWithCommittedOffset = new ArrayList<>();
+                Collection<TopicPartition> 
partitionsToResetWithoutCommittedOffset = new ArrayList<>();
+
+                for (TopicPartition topicPartition : partitionsToReset) {
+                    if (currentCommittedOffsets.containsKey(topicPartition))
+                        
partitionsToResetWithCommittedOffset.add(topicPartition);
+                    else
+                        
partitionsToResetWithoutCommittedOffset.add(topicPartition);
+                }
+
+                Map<TopicPartition, OffsetAndMetadata> 
preparedOffsetsForPartitionsWithCommittedOffset = 
partitionsToResetWithCommittedOffset.stream()
+                    .collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                        OffsetAndMetadata committedOffset = 
currentCommittedOffsets.get(topicPartition);
+
+                        if (committedOffset == null) {
+                            throw new IllegalStateException("Expected a valid 
current offset for topic partition: " + topicPartition);
+                        }
+
+                        return new OffsetAndMetadata(committedOffset.offset());
+                    }));
+
+                Map<TopicPartition, OffsetAndMetadata> 
preparedOffsetsForPartitionsWithoutCommittedOffset = 
getLogEndOffsets(partitionsToResetWithoutCommittedOffset)
+                    
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
+                        if (!(e.getValue() instanceof LogOffset)) {
+                            CommandLineUtils.printUsageAndExit(opts.parser, 
"Error getting ending offset of topic partition: " + e.getKey());
+                        }
+
+                        return new OffsetAndMetadata(((LogOffset) 
e.getValue()).value);
+                    }));
+
+                
preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset);
+
+                return preparedOffsetsForPartitionsWithCommittedOffset;
+            }
+
+            CommandLineUtils.printUsageAndExit(opts.parser, 
String.format("Option '%s' requires one of the following scenarios: %s", 
opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts));
+            return null;
+        }
+
+        Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
resetPlanFromFile() {

Review Comment:
   I assume quite a bit of this code was copied from the consumer group tool. 
Is there any chance of sharing some code? May not be the case, but since most 
of this code is just dealing with topic partitions and not with tasks, I wonder 
if there is an easy way to refactor functions like this into a common utility.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -330,13 +403,433 @@ Map<TopicPartition, OffsetsInfo> 
getOffsets(StreamsGroupDescription description)
 
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                return adminClient.listConsumerGroupOffsets(
-                    Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+                return adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new 
ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new 
HashMap<>();
+            List<String> groupIds = listStreamsGroups();
+            if (!groupIds.isEmpty()) {
+                Map<String, KafkaFuture<StreamsGroupDescription>> 
streamsGroups = adminClient.describeStreamsGroups(
+                    groupIds
+                ).describedGroups();
+
+                streamsGroups.forEach((groupId, groupDescription) -> {
+                    try {
+                        String state = 
groupDescription.get().groupState().toString();
+                        switch (state) {
+                            case "Empty":
+                            case "Dead":
+                                result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                                break;
+                            default:
+                                printError("Assignments can only be reset if 
the group '" + groupId + "' is inactive, but the current state is " + state + 
".", Optional.empty());
+                                result.put(groupId, Collections.emptyMap());
+                        }
+                    } catch (InterruptedException ie) {
+                        throw new RuntimeException(ie);
+                    } catch (ExecutionException ee) {
+                        if (ee.getCause() instanceof GroupIdNotFoundException) 
{
+                            result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+                        } else {
+                            throw new RuntimeException(ee);
+                        }
+                    }
+                });
+            }
+            return result;
+        }
+
+        private Map<TopicPartition, OffsetAndMetadata> 
resetOffsetsForInactiveGroup(String groupId) {
+            try {
+                Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
+                Map<TopicPartition, OffsetAndMetadata> preparedOffsets = 
prepareOffsetsToReset(groupId, partitionsToReset);
+
+                // Dry-run is the default behavior if --execute is not 
specified
+                boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+                if (!dryRun) {
+                    adminClient.alterStreamsGroupOffsets(
+                        groupId,
+                        preparedOffsets
+                    ).all().get();
+                }
+
+                return preparedOffsets;
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof KafkaException) {
+                    throw (KafkaException) cause;
+                } else {
+                    throw new RuntimeException(cause);
+                }
+            }
+        }
+
+        private Collection<TopicPartition> getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
+            if (opts.options.has(opts.allTopicsOpt)) {
+                return getCommittedOffsets(groupId).keySet();
+            } else if (opts.options.has(opts.topicOpt)) {
+                List<String> topics = opts.options.valuesOf(opts.topicOpt);
+                return parseTopicPartitionsToReset(topics);
+            } else {
+                if (!opts.options.has(opts.resetFromFileOpt))
+                    CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
+
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> parseTopicPartitionsToReset(List<String> 
topicArgs) throws ExecutionException, InterruptedException {
+            List<String> topicsWithPartitions = new ArrayList<>();
+            List<String> topics = new ArrayList<>();
+
+            topicArgs.forEach(topicArg -> {
+                if (topicArg.contains(TOPIC_PARTITION_SEPARATOR))
+                    topicsWithPartitions.add(topicArg);
+                else
+                    topics.add(topicArg);
+            });
+
+            List<TopicPartition> specifiedPartitions =
+                
topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList());
+
+            List<TopicPartition> unspecifiedPartitions = new ArrayList<>();
+
+            if (!topics.isEmpty()) {
+                Map<String, TopicDescription> descriptionMap = 
adminClient.describeTopics(
+                    topics
+                ).allTopicNames().get();
+
+                descriptionMap.forEach((topic, description) ->
+                    description.partitions().forEach(tpInfo -> 
unspecifiedPartitions.add(new TopicPartition(topic, tpInfo.partition())))
+                );
+            }
+
+            specifiedPartitions.addAll(unspecifiedPartitions);
+
+            return specifiedPartitions;
+        }
+
+        private Stream<TopicPartition> parseTopicsWithPartitions(String 
topicArg) {
+            ToIntFunction<String> partitionNum = partition -> {
+                try {
+                    return Integer.parseInt(partition);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid partition '" + 
partition + "' specified in topic arg '" + topicArg + "''");
+                }
+            };
+
+            String[] arr = topicArg.split(":");
+
+            if (arr.length != 2)
+                throw new IllegalArgumentException("Invalid topic arg '" + 
topicArg + "', expected topic name and partitions");
+
+            String topic = arr[0];
+            String partitions = arr[1];
+
+            return Arrays.stream(partitions.split(",")).
+                map(partition -> new TopicPartition(topic, 
partitionNum.applyAsInt(partition)));
+        }
+
+        @SuppressWarnings("CyclomaticComplexity")
+        private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
+            if (opts.options.has(opts.resetToOffsetOpt)) {
+                long offset = opts.options.valueOf(opts.resetToOffsetOpt);
+                return 
checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
 tp -> offset)))
+                    
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+            } else if (opts.options.has(opts.resetToEarliestOpt)) {
+                Map<TopicPartition, LogOffsetResult> logStartOffsets = 
getLogStartOffsets(partitionsToReset);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logOffsetResult = 
logStartOffsets.get(topicPartition);
+
+                    if (!(logOffsetResult instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting starting offset of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logOffsetResult).value);
+                }));
+            } else if (opts.options.has(opts.resetToLatestOpt)) {
+                Map<TopicPartition, LogOffsetResult> logEndOffsets = 
getLogEndOffsets(partitionsToReset);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logOffsetResult = 
logEndOffsets.get(topicPartition);
+
+                    if (!(logOffsetResult instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting ending offset of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logOffsetResult).value);
+                }));
+            } else if (opts.options.has(opts.resetShiftByOpt)) {
+                Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
+                Map<TopicPartition, Long> requestedOffsets = 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    long shiftBy = opts.options.valueOf(opts.resetShiftByOpt);
+                    OffsetAndMetadata currentOffset = 
currentCommittedOffsets.get(topicPartition);
+
+                    if (currentOffset == null) {
+                        throw new IllegalArgumentException("Cannot shift 
offset for partition " + topicPartition + " since there is no current committed 
offset");
+                    }
+
+                    return currentOffset.offset() + shiftBy;
+                }));
+                return checkOffsetsRange(requestedOffsets).entrySet().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+            } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+                try {
+                    long timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt));
+                    Map<TopicPartition, LogOffsetResult> logTimestampOffsets = 
getLogTimestampOffsets(partitionsToReset, timestamp);
+                    return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                        LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
+
+                        if (!(logTimestampOffset instanceof LogOffset)) {
+                            CommandLineUtils.printUsageAndExit(opts.parser, 
"Error getting offset by timestamp of topic partition: " + topicPartition);
+                        }
+
+                        return new OffsetAndMetadata(((LogOffset) 
logTimestampOffset).value);
+                    }));
+                } catch (ParseException e) {
+                    throw new RuntimeException(e);
+                }
+            } else if (opts.options.has(opts.resetByDurationOpt)) {
+                String duration = 
opts.options.valueOf(opts.resetByDurationOpt);
+                Duration durationParsed = Duration.parse(duration);
+                Instant now = Instant.now();
+                durationParsed.negated().addTo(now);
+                long timestamp = now.minus(durationParsed).toEpochMilli();
+                Map<TopicPartition, LogOffsetResult> logTimestampOffsets = 
getLogTimestampOffsets(partitionsToReset, timestamp);
+                return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                    LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
+
+                    if (!(logTimestampOffset instanceof LogOffset)) {
+                        CommandLineUtils.printUsageAndExit(opts.parser, "Error 
getting offset by timestamp of topic partition: " + topicPartition);
+                    }
+
+                    return new OffsetAndMetadata(((LogOffset) 
logTimestampOffset).value);
+                }));
+            } else if (resetPlanFromFile().isPresent()) {
+                return resetPlanFromFile().map(resetPlan -> {
+                    Map<TopicPartition, OffsetAndMetadata> resetPlanForGroup = 
resetPlan.get(groupId);
+
+                    if (resetPlanForGroup == null) {
+                        printError("No reset plan for group " + groupId + " 
found", Optional.empty());
+                        return Collections.<TopicPartition, 
OffsetAndMetadata>emptyMap();
+                    }
+
+                    Map<TopicPartition, Long> requestedOffsets = 
resetPlanForGroup.keySet().stream().collect(Collectors.toMap(
+                        Function.identity(),
+                        topicPartition -> 
resetPlanForGroup.get(topicPartition).offset()));
+
+                    return 
checkOffsetsRange(requestedOffsets).entrySet().stream()
+                        .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
+                }).orElseGet(Collections::emptyMap);
+            } else if (opts.options.has(opts.resetToCurrentOpt)) {
+                Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
+                Collection<TopicPartition> 
partitionsToResetWithCommittedOffset = new ArrayList<>();
+                Collection<TopicPartition> 
partitionsToResetWithoutCommittedOffset = new ArrayList<>();
+
+                for (TopicPartition topicPartition : partitionsToReset) {
+                    if (currentCommittedOffsets.containsKey(topicPartition))
+                        
partitionsToResetWithCommittedOffset.add(topicPartition);
+                    else
+                        
partitionsToResetWithoutCommittedOffset.add(topicPartition);
+                }
+
+                Map<TopicPartition, OffsetAndMetadata> 
preparedOffsetsForPartitionsWithCommittedOffset = 
partitionsToResetWithCommittedOffset.stream()
+                    .collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
+                        OffsetAndMetadata committedOffset = 
currentCommittedOffsets.get(topicPartition);
+
+                        if (committedOffset == null) {
+                            throw new IllegalStateException("Expected a valid 
current offset for topic partition: " + topicPartition);
+                        }
+
+                        return new OffsetAndMetadata(committedOffset.offset());
+                    }));
+
+                Map<TopicPartition, OffsetAndMetadata> 
preparedOffsetsForPartitionsWithoutCommittedOffset = 
getLogEndOffsets(partitionsToResetWithoutCommittedOffset)
+                    
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
+                        if (!(e.getValue() instanceof LogOffset)) {
+                            CommandLineUtils.printUsageAndExit(opts.parser, 
"Error getting ending offset of topic partition: " + e.getKey());
+                        }
+
+                        return new OffsetAndMetadata(((LogOffset) 
e.getValue()).value);
+                    }));
+
+                
preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset);
+
+                return preparedOffsetsForPartitionsWithCommittedOffset;
+            }
+
+            CommandLineUtils.printUsageAndExit(opts.parser, 
String.format("Option '%s' requires one of the following scenarios: %s", 
opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts));
+            return null;
+        }
+
+        Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
resetPlanFromFile() {
+            if (opts.options.has(opts.resetFromFileOpt)) {
+                try {
+                    String resetPlanPath = 
opts.options.valueOf(opts.resetFromFileOpt);
+                    String resetPlanCsv = 
Utils.readFileAsString(resetPlanPath);
+                    Map<String, Map<TopicPartition, OffsetAndMetadata>> 
resetPlan = parseResetPlan(resetPlanCsv);
+                    return Optional.of(resetPlan);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            } else return Optional.empty();
+        }
+
+        private Map<String, Map<TopicPartition, OffsetAndMetadata>> 
parseResetPlan(String resetPlanCsv) {
+            ObjectReader csvReader = 
CsvUtils.readerFor(CsvUtils.CsvRecordNoGroup.class);
+            String[] lines = resetPlanCsv.split("\n");
+            boolean isSingleGroupQuery = 
opts.options.valuesOf(opts.groupOpt).size() == 1;
+            boolean isOldCsvFormat = false;
+            try {
+                if (lines.length > 0) {
+                    csvReader.readValue(lines[0], 
CsvUtils.CsvRecordNoGroup.class);
+                    isOldCsvFormat = true;
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+                // Ignore.

Review Comment:
   Don't we want to fail here somehow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to