vvcephei commented on a change in pull request #11952:
URL: https://github.com/apache/kafka/pull/11952#discussion_r836629808



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -887,6 +892,98 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
         }
     }
 
+    /**
+     * NOTE: Header forwarding is undefined behavior, but we still want to 
understand the
+     * behavior so that we can make decisions about defining it in the future.
+     */
+    @Test
+    public void shouldForwardCurrentHeaders() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), 
ofMillis(10L)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+        );
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockApiProcessor<Integer, String, Void, Void> processor = 
supplier.theCapturedProcessor();
+
+            inputTopic1.pipeInput(new TestRecord<>(
+                0,
+                "A0",
+                new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x1})}),
+                0L
+            ));
+            inputTopic2.pipeInput(new TestRecord<>(
+                1,
+                "a0",
+                new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x2})}),
+                0L
+            ));
+            // bump stream-time to trigger outer-join results
+            inputTopic2.pipeInput(new TestRecord<>(
+                3,
+                "dummy",
+                new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x3})}),
+                (long) 211
+            ));
+
+            // Again, header forwarding is undefined, but the current observed 
behavior is that
+            // the headers pass through the forwarding record.
+            processor.checkAndClearProcessedRecords(
+                new Record<>(
+                    1,
+                    "null+a0",
+                    0L,
+                    new RecordHeaders(new Header[]{new RecordHeader("h", new 
byte[]{0x3})})

Review comment:
       Yeah, I was a bit bummed to see this, but at least now we know! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to