mjsax commented on a change in pull request #11952:
URL: https://github.com/apache/kafka/pull/11952#discussion_r835654066
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
##########
@@ -887,6 +892,98 @@ public void
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
}
}
+ /**
+ * NOTE: Header forwarding is undefined behavior, but we still want to
understand the
+ * behavior so that we can make decisions about defining it in the future.
+ */
+ @Test
+ public void shouldForwardCurrentHeaders() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L),
ofMillis(10L)),
+ StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
+ );
+ joined.process(supplier);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockApiProcessor<Integer, String, Void, Void> processor =
supplier.theCapturedProcessor();
+
+ inputTopic1.pipeInput(new TestRecord<>(
+ 0,
+ "A0",
+ new RecordHeaders(new Header[]{new RecordHeader("h", new
byte[]{0x1})}),
+ 0L
+ ));
+ inputTopic2.pipeInput(new TestRecord<>(
+ 1,
+ "a0",
+ new RecordHeaders(new Header[]{new RecordHeader("h", new
byte[]{0x2})}),
+ 0L
+ ));
+ // bump stream-time to trigger outer-join results
+ inputTopic2.pipeInput(new TestRecord<>(
+ 3,
+ "dummy",
+ new RecordHeaders(new Header[]{new RecordHeader("h", new
byte[]{0x3})}),
+ (long) 211
+ ));
+
+ // Again, header forwarding is undefined, but the current observed
behavior is that
+ // the headers pass through the forwarding record.
+ processor.checkAndClearProcessedRecords(
+ new Record<>(
+ 1,
+ "null+a0",
+ 0L,
+ new RecordHeaders(new Header[]{new RecordHeader("h", new
byte[]{0x3})})
Review comment:
Semantically really bad to forward `0x3`, but well, it is what it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]