Rancho-7 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2296522582


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -151,6 +195,44 @@ static void validate(KafkaConsumer<byte[], byte[]> 
consumer, byte[] message, Con
         }
     }
 
+    private static void validateHeaders(KafkaConsumer<byte[], byte[]> 
consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
+        if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
+            if (!record.headers().iterator().hasNext()) {
+                consumer.commitSync();
+                throw new RuntimeException("Expected message headers but 
received none");
+            }
+            
+            Iterator<Header> sentIterator = sentHeaders.iterator();
+            Iterator<Header> receivedIterator = record.headers().iterator();
+            
+            while (sentIterator.hasNext() && receivedIterator.hasNext()) {
+                Header sentHeader = sentIterator.next();
+                Header receivedHeader = receivedIterator.next();
+                if (!receivedHeader.key().equals(sentHeader.key()) || 
!Arrays.equals(receivedHeader.value(), sentHeader.value())) {
+                    consumer.commitSync();
+                    throw new RuntimeException("The message header read [" + 
receivedHeader.key() + ":" + Arrays.toString(receivedHeader.value()) +
+                            "] did not match the message header sent [" + 
sentHeader.key() + ":" + Arrays.toString(sentHeader.value()) + "]");
+                }
+            }
+            
+            if (sentIterator.hasNext() || receivedIterator.hasNext()) {
+                consumer.commitSync();
+                throw new RuntimeException("Header count mismatch between sent 
and received messages");
+            }
+        }
+    }
+
+    private static List<Header> generateHeadersWithSeparateSizes(Random 
random, int numHeaders, int keySize, int valueSize) {
+        List<Header> headers = new ArrayList<>();
+
+        for (int i = 0; i < numHeaders; i++) {
+            String headerKey = new String(randomBytesOfLen(random, keySize), 
StandardCharsets.UTF_8);
+            byte[] headerValue = randomBytesOfLen(random, valueSize);

Review Comment:
   Good point. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to