m1a2st commented on code in PR #20301: URL: https://github.com/apache/kafka/pull/20301#discussion_r2279130802
########## tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java: ########## @@ -29,50 +33,248 @@ import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class EndToEndLatencyTest { + private static final byte[] RECORD_VALUE = "record-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_VALUE_DIFFERENT = "record-received".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY = "key-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY_DIFFERENT = "key-received".getBytes(StandardCharsets.UTF_8); + private static final String HEADER_KEY = "header-key-sent"; + private static final String HEADER_KEY_DIFFERENT = "header-key-received"; + private static final byte[] HEADER_VALUE = "header-value-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] HEADER_VALUE_DIFFERENT = "header-value-received".getBytes(StandardCharsets.UTF_8); + + // legacy format test arguments + private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = { + "localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random" + }; + + private static class ArgsBuilder { + private final Map<String, String> params = new LinkedHashMap<>(); + + private ArgsBuilder() { + params.put("--bootstrap-server", "localhost:9092"); + params.put("--topic", "test-topic"); + params.put("--num-records", "100"); + params.put("--producer-acks", "1"); + params.put("--record-size", "200"); + } + + public static ArgsBuilder defaults() { + return new ArgsBuilder(); + } + + public ArgsBuilder with(String param, String value) { + params.put(param, value); + return this; + } + + public String[] build() { + return params.entrySet().stream() + .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())) + .toArray(String[]::new); + } + + public ArgsBuilder withNegative(String param) { + return with(param, "-1"); + } + + public ArgsBuilder withZero(String param) { + return with(param, "0"); + } + } + @Mock KafkaConsumer<byte[], byte[]> consumer; @Mock ConsumerRecords<byte[], byte[]> records; @Test - public void shouldFailWhenSuppliedUnexpectedArgs() { - String[] args = new String[] {"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"}; - assertThrows(TerseException.class, () -> EndToEndLatency.execute(args)); + public void testInvalidArgs() { + testUnexpectedArgsWithLegacyFormat(); + testInvalidProducerAcks(); + testInvalidNumRecords(); + testInvalidRecordSize(); + testInvalidRecordKey(); + testInvalidNumHeaders(); + testInvalidRecordHeaderKey(); + testInvalidRecordHeaderValue(); + } + + private void testUnexpectedArgsWithLegacyFormat() { + assertThrows(TerseException.class, () -> EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED)); Review Comment: Could you also assert this error message? ########## tools/src/test/java/org/apache/kafka/tools/EndToEndLatencyTest.java: ########## @@ -29,50 +33,248 @@ import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class EndToEndLatencyTest { + private static final byte[] RECORD_VALUE = "record-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_VALUE_DIFFERENT = "record-received".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY = "key-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] RECORD_KEY_DIFFERENT = "key-received".getBytes(StandardCharsets.UTF_8); + private static final String HEADER_KEY = "header-key-sent"; + private static final String HEADER_KEY_DIFFERENT = "header-key-received"; + private static final byte[] HEADER_VALUE = "header-value-sent".getBytes(StandardCharsets.UTF_8); + private static final byte[] HEADER_VALUE_DIFFERENT = "header-value-received".getBytes(StandardCharsets.UTF_8); + + // legacy format test arguments + private static final String[] LEGACY_INVALID_ARGS_UNEXPECTED = { + "localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random" + }; + + private static class ArgsBuilder { + private final Map<String, String> params = new LinkedHashMap<>(); + + private ArgsBuilder() { + params.put("--bootstrap-server", "localhost:9092"); + params.put("--topic", "test-topic"); + params.put("--num-records", "100"); + params.put("--producer-acks", "1"); + params.put("--record-size", "200"); + } + + public static ArgsBuilder defaults() { + return new ArgsBuilder(); + } + + public ArgsBuilder with(String param, String value) { + params.put(param, value); + return this; + } + + public String[] build() { + return params.entrySet().stream() + .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())) + .toArray(String[]::new); + } + + public ArgsBuilder withNegative(String param) { + return with(param, "-1"); + } + + public ArgsBuilder withZero(String param) { + return with(param, "0"); + } + } + @Mock KafkaConsumer<byte[], byte[]> consumer; @Mock ConsumerRecords<byte[], byte[]> records; @Test - public void shouldFailWhenSuppliedUnexpectedArgs() { - String[] args = new String[] {"localhost:9092", "test", "10000", "1", "200", "propsfile.properties", "random"}; - assertThrows(TerseException.class, () -> EndToEndLatency.execute(args)); + public void testInvalidArgs() { + testUnexpectedArgsWithLegacyFormat(); + testInvalidProducerAcks(); + testInvalidNumRecords(); + testInvalidRecordSize(); + testInvalidRecordKey(); + testInvalidNumHeaders(); + testInvalidRecordHeaderKey(); + testInvalidRecordHeaderValue(); + } + + private void testUnexpectedArgsWithLegacyFormat() { + assertThrows(TerseException.class, () -> EndToEndLatency.execute(LEGACY_INVALID_ARGS_UNEXPECTED)); + } + + private void testInvalidNumRecords() { + String expectedMsg = "Value for --num-records must be a positive integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--num-records").build(), expectedMsg); + } + + private void testInvalidRecordSize() { + String expectedMsg = "Value for --record-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-size").build(), expectedMsg); + } + + private void testInvalidRecordKey() { + String expectedMsg = "Value for --record-key-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-key-size").build(), expectedMsg); + } + + private void testInvalidNumHeaders() { + String expectedMsg = "Value for --num-headers must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--num-headers").build(), expectedMsg); + } + + private void testInvalidRecordHeaderKey() { + String expectedMsg = "Value for --record-header-key-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-header-key-size").build(), expectedMsg); + } + + private void testInvalidRecordHeaderValue() { + String expectedMsg = "Value for --record-header-size must be a non-negative integer."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withNegative("--record-header-size").build(), expectedMsg); + } + + private void testInvalidProducerAcks() { + String expectedMsg = "Invalid value for --producer-acks. Latency testing requires synchronous acknowledgement. Please use '1' or 'all'."; + assertInitializeInvalidOptionsExitCodeAndMsg( + ArgsBuilder.defaults().withZero("--producer-acks").build(), expectedMsg); + } + + private void assertInitializeInvalidOptionsExitCodeAndMsg(String[] args, String expectedMsg) { + Exit.setExitProcedure((exitCode, message) -> { + assertEquals(1, exitCode); + assertTrue(message.contains(expectedMsg)); + throw new RuntimeException(); + }); + try { + assertThrows(RuntimeException.class, () -> EndToEndLatency.execute(args)); + } finally { + Exit.resetExitProcedure(); + } } @Test - public void shouldFailWhenProducerAcksAreNotSynchronised() { - String[] args = new String[] {"localhost:9092", "test", "10000", "0", "200"}; - assertThrows(IllegalArgumentException.class, () -> EndToEndLatency.execute(args)); + public void testConvertLegacyArgs() throws Exception { Review Comment: Could you add `@SuppressWarnings("removal")` to suppress warning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org