Rancho-7 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2298564631


##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -122,33 +137,87 @@ static void execute(String... args) throws Exception {
                 latencies[i] = elapsed / 1000 / 1000;
             }
 
-            printResults(numMessages, totalTime, latencies);
+            printResults(numRecords, totalTime, latencies);
             consumer.commitSync();
         }
     }
 
     // Visible for testing
-    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
message, ConsumerRecords<byte[], byte[]> records) {
+    static void validate(KafkaConsumer<byte[], byte[]> consumer, byte[] 
sentRecordValue, ConsumerRecords<byte[], byte[]> records, byte[] sentRecordKey, 
Iterable<Header> sentHeaders) {
         if (records.isEmpty()) {
-            consumer.commitSync();
-            throw new RuntimeException("poll() timed out before finding a 
result (timeout:[" + POLL_TIMEOUT_MS + "])");
+            commitAndThrow(consumer, "poll() timed out before finding a result 
(timeout:[" + POLL_TIMEOUT_MS + "ms])");
         }
 
-        //Check result matches the original record
-        String sent = new String(message, StandardCharsets.UTF_8);
-        String read = new String(records.iterator().next().value(), 
StandardCharsets.UTF_8);
+        ConsumerRecord<byte[], byte[]> record = records.iterator().next();
+        String sent = new String(sentRecordValue, StandardCharsets.UTF_8);
+        String read = new String(record.value(), StandardCharsets.UTF_8);
 
         if (!read.equals(sent)) {
-            consumer.commitSync();
-            throw new RuntimeException("The message read [" + read + "] did 
not match the message sent [" + sent + "]");
+            commitAndThrow(consumer, "The message value read [" + read + "] 
did not match the message value sent [" + sent + "]");
+        }
+
+        if (sentRecordKey != null) {
+            if (record.key() == null) {
+                commitAndThrow(consumer, "Expected message key but received 
null");
+            }
+            String sentKey = new String(sentRecordKey, StandardCharsets.UTF_8);
+            String readKey = new String(record.key(), StandardCharsets.UTF_8);
+            if (!readKey.equals(sentKey)) {
+                commitAndThrow(consumer, "The message key read [" + readKey + 
"] did not match the message key sent [" + sentKey + "]");
+            }
+        } else if (record.key() != null) {
+            commitAndThrow(consumer, "Expected null message key but received 
[" + new String(record.key(), StandardCharsets.UTF_8) + "]");
         }
 
+        validateHeaders(consumer, sentHeaders, record);
+
         //Check we only got the one message
         if (records.count() != 1) {
             int count = records.count();
-            consumer.commitSync();
-            throw new RuntimeException("Only one result was expected during 
this test. We found [" + count + "]");
+            commitAndThrow(consumer, "Only one result was expected during this 
test. We found [" + count + "]");
+        }
+    }
+
+    private static void commitAndThrow(KafkaConsumer<byte[], byte[]> consumer, 
String message) {
+        consumer.commitSync();
+        throw new RuntimeException(message);
+    }
+
+    private static void validateHeaders(KafkaConsumer<byte[], byte[]> 
consumer, Iterable<Header> sentHeaders, ConsumerRecord<byte[], byte[]> record) {
+        if (sentHeaders != null && sentHeaders.iterator().hasNext()) {
+            if (!record.headers().iterator().hasNext()) {
+                commitAndThrow(consumer, "Expected message headers but 
received none");
+            }
+            
+            Iterator<Header> sentIterator = sentHeaders.iterator();
+            Iterator<Header> receivedIterator = record.headers().iterator();
+            
+            while (sentIterator.hasNext() && receivedIterator.hasNext()) {
+                Header sentHeader = sentIterator.next();
+                Header receivedHeader = receivedIterator.next();
+                if (!receivedHeader.key().equals(sentHeader.key()) || 
!Arrays.equals(receivedHeader.value(), sentHeader.value())) {
+                    String receivedValueStr = receivedHeader.value() == null ? 
"null" : Arrays.toString(receivedHeader.value());
+                    String sentValueStr = sentHeader.value() == null ? "null" 
: Arrays.toString(sentHeader.value());
+                    commitAndThrow(consumer, "The message header read [" + 
receivedHeader.key() + ":" + receivedValueStr +
+                            "] did not match the message header sent [" + 
sentHeader.key() + ":" + sentValueStr + "]");
+                }
+            }
+            
+            if (sentIterator.hasNext() || receivedIterator.hasNext()) {
+                commitAndThrow(consumer, "Header count mismatch between sent 
and received messages");
+            }
+        }
+    }
+
+    private static List<Header> generateHeadersWithSeparateSizes(Random 
random, int numHeaders, int keySize, int valueSize) {
+        List<Header> headers = new ArrayList<>();
+
+        for (int i = 0; i < numHeaders; i++) {
+            String headerKey = new String(randomBytesOfLen(random, keySize), 
StandardCharsets.UTF_8);

Review Comment:
   Not yet. The `randomBytesOfLen` function was used in the earlier version, 
and I found it could be reused, so I kept it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to