chia7712 commented on code in PR #20301: URL: https://github.com/apache/kafka/pull/20301#discussion_r2291935770
########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -122,27 +149,44 @@ static void execute(String... args) throws Exception { latencies[i] = elapsed / 1000 / 1000; } - printResults(numMessages, totalTime, latencies); + printResults(numRecords, totalTime, latencies); consumer.commitSync(); } } // Visible for testing - static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, ConsumerRecords<byte[], byte[]> records) { + static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, Iterable<Header> sentHeaders) { if (records.isEmpty()) { consumer.commitSync(); - throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "])"); + throw new RuntimeException("poll() timed out before finding a result (timeout:[" + POLL_TIMEOUT_MS + "ms])"); } - //Check result matches the original record - String sent = new String(message, StandardCharsets.UTF_8); - String read = new String(records.iterator().next().value(), StandardCharsets.UTF_8); + ConsumerRecord<byte[], byte[]> record = records.iterator().next(); + String sent = new String(sentRecordValue, StandardCharsets.UTF_8); + String read = new String(record.value(), StandardCharsets.UTF_8); if (!read.equals(sent)) { consumer.commitSync(); - throw new RuntimeException("The message read [" + read + "] did not match the message sent [" + sent + "]"); + throw new RuntimeException("The message value read [" + read + "] did not match the message value sent [" + sent + "]"); + } + + if (sentRecordKey != null) { + if (record.key() == null) { + throw new RuntimeException("Expected message key but received null"); Review Comment: Should we commit the offsets too? ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -221,4 +303,156 @@ private static KafkaProducer<byte[], byte[]> createKafkaProducer(Optional<String producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(producerProps); } + + // Visible for testing + static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception { + if (args.length == 0) { + return args; + } + + boolean hasNamedArgs = Arrays.stream(args).anyMatch(arg -> arg.startsWith("--")); + if (hasNamedArgs) { + return args; + } + + if (args.length != 5 && args.length != 6) { + throw new TerseException("Invalid number of arguments. Expected 5 or 6 positional arguments, but got " + args.length + ". " + + "Usage: bootstrap-server topic num-records producer-acks record-size [optional] command-config"); + } + + return convertLegacyArgs(args); + } + + private static String[] convertLegacyArgs(String[] legacyArgs) { + List<String> newArgs = new ArrayList<>(); + + // broker_list -> --bootstrap-server + newArgs.add("--bootstrap-server"); + newArgs.add(legacyArgs[0]); + + // topic -> --topic + newArgs.add("--topic"); + newArgs.add(legacyArgs[1]); + + // num_messages -> --num-records + newArgs.add("--num-records"); + newArgs.add(legacyArgs[2]); + + // producer_acks -> --producer-acks + newArgs.add("--producer-acks"); + newArgs.add(legacyArgs[3]); + + // message_size_bytes -> --record-size + newArgs.add("--record-size"); + newArgs.add(legacyArgs[4]); + + // properties_file -> --command-config + if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) { + newArgs.add("--command-config"); + newArgs.add(legacyArgs[5]); + } + System.out.println("WARNING: Positional argument usage is deprecated and will be removed in Apache Kafka 5.0. " + + "Please use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config"); + return newArgs.toArray(new String[0]); + } + + public static final class EndToEndLatencyCommandOptions extends CommandDefaultOptions { + final OptionSpec<String> bootstrapServerOpt; + final OptionSpec<String> topicOpt; + final OptionSpec<Integer> numRecordsOpt; + final OptionSpec<String> acksOpt; + final OptionSpec<Integer> recordSizeOpt; + final OptionSpec<String> commandConfigOpt; + final OptionSpec<Integer> recordKeyOpt; + final OptionSpec<Integer> recordHeaderValueSizeOpt; + final OptionSpec<Integer> recordHeaderKeySizeOpt; + final OptionSpec<Integer> numHeadersOpt; + + public EndToEndLatencyCommandOptions(String[] args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg() + .describedAs("bootstrap-server") + .ofType(String.class); + topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for the test.") + .withRequiredArg() + .describedAs("topic-name") + .ofType(String.class); + numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of messages to send.") + .withRequiredArg() + .describedAs("count") + .ofType(Integer.class); + acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer acknowledgements. Must be '1' or 'all'.") + .withRequiredArg() + .describedAs("producer-acks") + .ofType(String.class); + recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size of each message payload in bytes.") + .withRequiredArg() + .describedAs("bytes") + .ofType(Integer.class); + recordKeyOpt = parser.accepts("record-key-size", "Optional: The size of the message key in bytes. If not set, messages are sent without a key.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class); + recordHeaderKeySizeOpt = parser.accepts("record-header-key-size", "Optional: The size of the message header key in bytes. Used together with record-header-size.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class); + recordHeaderValueSizeOpt = parser.accepts("record-header-size", "Optional: The size of message header value in bytes.") + .withOptionalArg() + .describedAs("bytes") + .ofType(Integer.class); + numHeadersOpt = parser.accepts("num-headers", "Optional: The number of headers to include in each message.") + .withOptionalArg() + .describedAs("count") + .ofType(Integer.class) + .defaultsTo(1); + commandConfigOpt = parser.accepts("command-config", "Optional: A property file for Kafka producer/consumer/admin client configuration.") + .withOptionalArg() + .describedAs("config-file") + .ofType(String.class); + + try { + options = parser.parse(args); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + checkArgs(); + } + + void checkArgs() { + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures end-to-end latency in Kafka by sending messages and timing their reception."); + + // check required arguments + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt); + + // validate 'producer-acks' + String acksValue = options.valueOf(acksOpt); + if (!List.of("1", "all").contains(acksValue)) { Review Comment: +1 to keep this behavior ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -77,21 +87,35 @@ static int mainNoExit(String... args) { } // Visible for testing - static void execute(String... args) throws Exception { - if (args.length != 5 && args.length != 6) { - throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() - + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); - } + static void execute(String[] args) throws Exception { + String[] processedArgs = convertLegacyArgsIfNeeded(args); + EndToEndLatencyCommandOptions opts = new EndToEndLatencyCommandOptions(processedArgs); - String brokers = args[0]; - String topic = args[1]; - int numMessages = Integer.parseInt(args[2]); - String acks = args[3]; - int messageSizeBytes = Integer.parseInt(args[4]); - Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty(); + // required + String brokers = opts.options.valueOf(opts.bootstrapServerOpt); + String topic = opts.options.valueOf(opts.topicOpt); + int numRecords = opts.options.valueOf(opts.numRecordsOpt); + String acks = opts.options.valueOf(opts.acksOpt); + int recordValueSize = opts.options.valueOf(opts.recordSizeOpt); - if (!List.of("1", "all").contains(acks)) { - throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); + // optional + Optional<String> propertiesFile = opts.options.has(opts.commandConfigOpt) ? + Optional.of(opts.options.valueOf(opts.commandConfigOpt)) : Optional.empty(); + int recordKeySize = 0; Review Comment: Why not add a default value to `recordKeyOpt` instead? ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -77,21 +87,35 @@ static int mainNoExit(String... args) { } // Visible for testing - static void execute(String... args) throws Exception { - if (args.length != 5 && args.length != 6) { - throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() - + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); - } + static void execute(String[] args) throws Exception { + String[] processedArgs = convertLegacyArgsIfNeeded(args); + EndToEndLatencyCommandOptions opts = new EndToEndLatencyCommandOptions(processedArgs); - String brokers = args[0]; - String topic = args[1]; - int numMessages = Integer.parseInt(args[2]); - String acks = args[3]; - int messageSizeBytes = Integer.parseInt(args[4]); - Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty(); + // required + String brokers = opts.options.valueOf(opts.bootstrapServerOpt); + String topic = opts.options.valueOf(opts.topicOpt); + int numRecords = opts.options.valueOf(opts.numRecordsOpt); + String acks = opts.options.valueOf(opts.acksOpt); + int recordValueSize = opts.options.valueOf(opts.recordSizeOpt); - if (!List.of("1", "all").contains(acks)) { - throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); + // optional + Optional<String> propertiesFile = opts.options.has(opts.commandConfigOpt) ? + Optional.of(opts.options.valueOf(opts.commandConfigOpt)) : Optional.empty(); + int recordKeySize = 0; + if (opts.options.has(opts.recordKeyOpt)) { + recordKeySize = opts.options.valueOf(opts.recordKeyOpt); + } + int numHeaders = 0; + if (opts.options.has(opts.numHeadersOpt)) { + numHeaders = opts.options.valueOf(opts.numHeadersOpt); + } + int headerKeySize = 0; Review Comment: ditto ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -77,21 +87,35 @@ static int mainNoExit(String... args) { } // Visible for testing - static void execute(String... args) throws Exception { - if (args.length != 5 && args.length != 6) { - throw new TerseException("USAGE: java " + EndToEndLatency.class.getName() - + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file"); - } + static void execute(String[] args) throws Exception { + String[] processedArgs = convertLegacyArgsIfNeeded(args); + EndToEndLatencyCommandOptions opts = new EndToEndLatencyCommandOptions(processedArgs); - String brokers = args[0]; - String topic = args[1]; - int numMessages = Integer.parseInt(args[2]); - String acks = args[3]; - int messageSizeBytes = Integer.parseInt(args[4]); - Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty(); + // required + String brokers = opts.options.valueOf(opts.bootstrapServerOpt); + String topic = opts.options.valueOf(opts.topicOpt); + int numRecords = opts.options.valueOf(opts.numRecordsOpt); + String acks = opts.options.valueOf(opts.acksOpt); + int recordValueSize = opts.options.valueOf(opts.recordSizeOpt); - if (!List.of("1", "all").contains(acks)) { - throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); + // optional + Optional<String> propertiesFile = opts.options.has(opts.commandConfigOpt) ? + Optional.of(opts.options.valueOf(opts.commandConfigOpt)) : Optional.empty(); + int recordKeySize = 0; + if (opts.options.has(opts.recordKeyOpt)) { + recordKeySize = opts.options.valueOf(opts.recordKeyOpt); + } + int numHeaders = 0; Review Comment: ditto ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -221,4 +303,165 @@ private static KafkaProducer<byte[], byte[]> createKafkaProducer(Optional<String producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<>(producerProps); } + + /** + * Converts legacy positional arguments to named arguments for backward compatibility. + * + * @param args the command line arguments to convert + * @return converted named arguments + * @throws Exception if the legacy arguments are invalid + * @deprecated Positional argument usage is deprecated and will be removed in Apache Kafka 5.0. + * Use named arguments instead: --bootstrap-server, --topic, --num-records, --producer-acks, --record-size, --command-config + */ + @Deprecated(since = "4.2", forRemoval = true) + static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception { + if (args.length == 0) { + return args; + } + + boolean hasNamedArgs = Arrays.stream(args).anyMatch(arg -> arg.startsWith("--")); Review Comment: `-` is legal character in topic names, so it could cause an issue if users create a topic like `topic--aaa`. Perhaps we should check the required `ops` instead? ########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -151,6 +195,44 @@ static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, Con } } + private static void validateHeaders(KafkaConsumer<byte[], byte[]> consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) { + if (sentHeaders != null && sentHeaders.iterator().hasNext()) { + if (!record.headers().iterator().hasNext()) { + consumer.commitSync(); + throw new RuntimeException("Expected message headers but received none"); + } + + Iterator<Header> sentIterator = sentHeaders.iterator(); + Iterator<Header> receivedIterator = record.headers().iterator(); + + while (sentIterator.hasNext() && receivedIterator.hasNext()) { + Header sentHeader = sentIterator.next(); + Header receivedHeader = receivedIterator.next(); + if (!receivedHeader.key().equals(sentHeader.key()) || !Arrays.equals(receivedHeader.value(), sentHeader.value())) { + consumer.commitSync(); + throw new RuntimeException("The message header read [" + receivedHeader.key() + ":" + Arrays.toString(receivedHeader.value()) + + "] did not match the message header sent [" + sentHeader.key() + ":" + Arrays.toString(sentHeader.value()) + "]"); + } + } + + if (sentIterator.hasNext() || receivedIterator.hasNext()) { + consumer.commitSync(); + throw new RuntimeException("Header count mismatch between sent and received messages"); + } + } + } + + private static List<Header> generateHeadersWithSeparateSizes(Random random, int numHeaders, int keySize, int valueSize) { + List<Header> headers = new ArrayList<>(); + + for (int i = 0; i < numHeaders; i++) { + String headerKey = new String(randomBytesOfLen(random, keySize), StandardCharsets.UTF_8); + byte[] headerValue = randomBytesOfLen(random, valueSize); Review Comment: Perhaps we should treat `-1` as `null` for header value, since `null` is legal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org