lucasbru commented on code in PR #19646: URL: https://github.com/apache/kafka/pull/19646#discussion_r2124050789
########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -330,13 +403,433 @@ Map<TopicPartition, OffsetsInfo> getOffsets(StreamsGroupDescription description) Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String groupId) { try { - return adminClient.listConsumerGroupOffsets( - Map.of(groupId, new ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get(); + return adminClient.listStreamsGroupOffsets( + Map.of(groupId, new ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() { + Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>(); + List<String> groupIds = listStreamsGroups(); + if (!groupIds.isEmpty()) { + Map<String, KafkaFuture<StreamsGroupDescription>> streamsGroups = adminClient.describeStreamsGroups( + groupIds + ).describedGroups(); + + streamsGroups.forEach((groupId, groupDescription) -> { + try { + String state = groupDescription.get().groupState().toString(); + switch (state) { + case "Empty": + case "Dead": + result.put(groupId, resetOffsetsForInactiveGroup(groupId)); + break; + default: + printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); + result.put(groupId, Collections.emptyMap()); + } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof GroupIdNotFoundException) { + result.put(groupId, resetOffsetsForInactiveGroup(groupId)); + } else { + throw new RuntimeException(ee); + } + } + }); + } + return result; + } + + private Map<TopicPartition, OffsetAndMetadata> resetOffsetsForInactiveGroup(String groupId) { + try { + Collection<TopicPartition> partitionsToReset = getPartitionsToReset(groupId); + Map<TopicPartition, OffsetAndMetadata> preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset); + + // Dry-run is the default behavior if --execute is not specified + boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); + if (!dryRun) { + adminClient.alterStreamsGroupOffsets( + groupId, + preparedOffsets + ).all().get(); + } + + return preparedOffsets; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof KafkaException) { + throw (KafkaException) cause; + } else { + throw new RuntimeException(cause); + } + } + } + + private Collection<TopicPartition> getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException { + if (opts.options.has(opts.allTopicsOpt)) { + return getCommittedOffsets(groupId).keySet(); + } else if (opts.options.has(opts.topicOpt)) { + List<String> topics = opts.options.valuesOf(opts.topicOpt); + return parseTopicPartitionsToReset(topics); + } else { + if (!opts.options.has(opts.resetFromFileOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic."); + + return Collections.emptyList(); + } + } + + private List<TopicPartition> parseTopicPartitionsToReset(List<String> topicArgs) throws ExecutionException, InterruptedException { + List<String> topicsWithPartitions = new ArrayList<>(); + List<String> topics = new ArrayList<>(); + + topicArgs.forEach(topicArg -> { + if (topicArg.contains(TOPIC_PARTITION_SEPARATOR)) + topicsWithPartitions.add(topicArg); + else + topics.add(topicArg); + }); + + List<TopicPartition> specifiedPartitions = + topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList()); + + List<TopicPartition> unspecifiedPartitions = new ArrayList<>(); + + if (!topics.isEmpty()) { + Map<String, TopicDescription> descriptionMap = adminClient.describeTopics( + topics + ).allTopicNames().get(); + + descriptionMap.forEach((topic, description) -> + description.partitions().forEach(tpInfo -> unspecifiedPartitions.add(new TopicPartition(topic, tpInfo.partition()))) + ); + } + + specifiedPartitions.addAll(unspecifiedPartitions); + + return specifiedPartitions; + } + + private Stream<TopicPartition> parseTopicsWithPartitions(String topicArg) { + ToIntFunction<String> partitionNum = partition -> { + try { + return Integer.parseInt(partition); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid partition '" + partition + "' specified in topic arg '" + topicArg + "''"); + } + }; + + String[] arr = topicArg.split(":"); + + if (arr.length != 2) + throw new IllegalArgumentException("Invalid topic arg '" + topicArg + "', expected topic name and partitions"); + + String topic = arr[0]; + String partitions = arr[1]; + + return Arrays.stream(partitions.split(",")). + map(partition -> new TopicPartition(topic, partitionNum.applyAsInt(partition))); + } + + @SuppressWarnings("CyclomaticComplexity") + private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) { + if (opts.options.has(opts.resetToOffsetOpt)) { + long offset = opts.options.valueOf(opts.resetToOffsetOpt); + return checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), tp -> offset))) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + } else if (opts.options.has(opts.resetToEarliestOpt)) { + Map<TopicPartition, LogOffsetResult> logStartOffsets = getLogStartOffsets(partitionsToReset); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logOffsetResult = logStartOffsets.get(topicPartition); + + if (!(logOffsetResult instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting starting offset of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logOffsetResult).value); + })); + } else if (opts.options.has(opts.resetToLatestOpt)) { + Map<TopicPartition, LogOffsetResult> logEndOffsets = getLogEndOffsets(partitionsToReset); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logOffsetResult = logEndOffsets.get(topicPartition); + + if (!(logOffsetResult instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logOffsetResult).value); + })); + } else if (opts.options.has(opts.resetShiftByOpt)) { + Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets = getCommittedOffsets(groupId); + Map<TopicPartition, Long> requestedOffsets = partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + long shiftBy = opts.options.valueOf(opts.resetShiftByOpt); + OffsetAndMetadata currentOffset = currentCommittedOffsets.get(topicPartition); + + if (currentOffset == null) { + throw new IllegalArgumentException("Cannot shift offset for partition " + topicPartition + " since there is no current committed offset"); + } + + return currentOffset.offset() + shiftBy; + })); + return checkOffsetsRange(requestedOffsets).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + } else if (opts.options.has(opts.resetToDatetimeOpt)) { + try { + long timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)); + Map<TopicPartition, LogOffsetResult> logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logTimestampOffset = logTimestampOffsets.get(topicPartition); + + if (!(logTimestampOffset instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting offset by timestamp of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logTimestampOffset).value); + })); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } else if (opts.options.has(opts.resetByDurationOpt)) { + String duration = opts.options.valueOf(opts.resetByDurationOpt); + Duration durationParsed = Duration.parse(duration); + Instant now = Instant.now(); + durationParsed.negated().addTo(now); + long timestamp = now.minus(durationParsed).toEpochMilli(); + Map<TopicPartition, LogOffsetResult> logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logTimestampOffset = logTimestampOffsets.get(topicPartition); + + if (!(logTimestampOffset instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting offset by timestamp of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logTimestampOffset).value); + })); + } else if (resetPlanFromFile().isPresent()) { + return resetPlanFromFile().map(resetPlan -> { + Map<TopicPartition, OffsetAndMetadata> resetPlanForGroup = resetPlan.get(groupId); + + if (resetPlanForGroup == null) { + printError("No reset plan for group " + groupId + " found", Optional.empty()); + return Collections.<TopicPartition, OffsetAndMetadata>emptyMap(); + } + + Map<TopicPartition, Long> requestedOffsets = resetPlanForGroup.keySet().stream().collect(Collectors.toMap( + Function.identity(), + topicPartition -> resetPlanForGroup.get(topicPartition).offset())); + + return checkOffsetsRange(requestedOffsets).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + }).orElseGet(Collections::emptyMap); + } else if (opts.options.has(opts.resetToCurrentOpt)) { + Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets = getCommittedOffsets(groupId); + Collection<TopicPartition> partitionsToResetWithCommittedOffset = new ArrayList<>(); + Collection<TopicPartition> partitionsToResetWithoutCommittedOffset = new ArrayList<>(); + + for (TopicPartition topicPartition : partitionsToReset) { + if (currentCommittedOffsets.containsKey(topicPartition)) + partitionsToResetWithCommittedOffset.add(topicPartition); + else + partitionsToResetWithoutCommittedOffset.add(topicPartition); + } + + Map<TopicPartition, OffsetAndMetadata> preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.stream() + .collect(Collectors.toMap(Function.identity(), topicPartition -> { + OffsetAndMetadata committedOffset = currentCommittedOffsets.get(topicPartition); + + if (committedOffset == null) { + throw new IllegalStateException("Expected a valid current offset for topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(committedOffset.offset()); + })); + + Map<TopicPartition, OffsetAndMetadata> preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> { + if (!(e.getValue() instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + e.getKey()); + } + + return new OffsetAndMetadata(((LogOffset) e.getValue()).value); + })); + + preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset); + + return preparedOffsetsForPartitionsWithCommittedOffset; + } + + CommandLineUtils.printUsageAndExit(opts.parser, String.format("Option '%s' requires one of the following scenarios: %s", opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts)); + return null; + } + + Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> resetPlanFromFile() { Review Comment: Is it really a lot of effort? It's pretty hard reviewing so much code, so we'd defintiely save on review time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org