Rancho-7 commented on code in PR #20301: URL: https://github.com/apache/kafka/pull/20301#discussion_r2296522582
########## tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java: ########## @@ -151,6 +195,44 @@ static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] message, Con } } + private static void validateHeaders(KafkaConsumer<byte[], byte[]> consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) { + if (sentHeaders != null && sentHeaders.iterator().hasNext()) { + if (!record.headers().iterator().hasNext()) { + consumer.commitSync(); + throw new RuntimeException("Expected message headers but received none"); + } + + Iterator<Header> sentIterator = sentHeaders.iterator(); + Iterator<Header> receivedIterator = record.headers().iterator(); + + while (sentIterator.hasNext() && receivedIterator.hasNext()) { + Header sentHeader = sentIterator.next(); + Header receivedHeader = receivedIterator.next(); + if (!receivedHeader.key().equals(sentHeader.key()) || !Arrays.equals(receivedHeader.value(), sentHeader.value())) { + consumer.commitSync(); + throw new RuntimeException("The message header read [" + receivedHeader.key() + ":" + Arrays.toString(receivedHeader.value()) + + "] did not match the message header sent [" + sentHeader.key() + ":" + Arrays.toString(sentHeader.value()) + "]"); + } + } + + if (sentIterator.hasNext() || receivedIterator.hasNext()) { + consumer.commitSync(); + throw new RuntimeException("Header count mismatch between sent and received messages"); + } + } + } + + private static List<Header> generateHeadersWithSeparateSizes(Random random, int numHeaders, int keySize, int valueSize) { + List<Header> headers = new ArrayList<>(); + + for (int i = 0; i < numHeaders; i++) { + String headerKey = new String(randomBytesOfLen(random, keySize), StandardCharsets.UTF_8); + byte[] headerValue = randomBytesOfLen(random, valueSize); Review Comment: Good point. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org