frankvicky commented on code in PR #21582:
URL: https://github.com/apache/kafka/pull/21582#discussion_r2873780736
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java:
##########
@@ -112,62 +145,106 @@ public void testKTable() {
final KTable<String, String> table4 = builder.table(topic2, consumed);
table4.toStream().process(supplier);
- try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final Headers headersA = makeHeaders("key", "A");
+ final Headers headersB = makeHeaders("key", "B");
+ final Headers headersC = makeHeaders("key", "C");
+ final Headers headersD = makeHeaders("key", "D");
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new
StringSerializer());
- inputTopic.pipeInput("A", "01", 5L);
- inputTopic.pipeInput("B", "02", 100L);
- inputTopic.pipeInput("C", "03", 0L);
- inputTopic.pipeInput("D", "04", 0L);
- inputTopic.pipeInput("A", "05", 10L);
- inputTopic.pipeInput("A", "06", 8L);
+
+ inputTopic.pipeInput(new TestRecord<>("A", "01", headersA, 5L));
+ inputTopic.pipeInput(new TestRecord<>("B", "02", headersB, 100L));
+ inputTopic.pipeInput(new TestRecord<>("C", "03", headersC, 0L));
+ inputTopic.pipeInput(new TestRecord<>("D", "04", headersD, 0L));
+ inputTopic.pipeInput(new TestRecord<>("A", "05", headersA, 10L));
+ inputTopic.pipeInput(new TestRecord<>("A", "06", headersA, 8L));
}
final List<MockApiProcessor<String, Object, Void, Void>> processors =
supplier.capturedProcessors(4);
- assertEquals(asList(
- new KeyValueTimestamp<>("A", "01", 5),
- new KeyValueTimestamp<>("B", "02", 100),
- new KeyValueTimestamp<>("C", "03", 0),
- new KeyValueTimestamp<>("D", "04", 0),
- new KeyValueTimestamp<>("A", "05", 10),
- new KeyValueTimestamp<>("A", "06", 8)),
- processors.get(0).processed());
- assertEquals(asList(
- new KeyValueTimestamp<>("A", 1, 5),
- new KeyValueTimestamp<>("B", 2, 100),
- new KeyValueTimestamp<>("C", 3, 0),
- new KeyValueTimestamp<>("D", 4, 0),
- new KeyValueTimestamp<>("A", 5, 10),
- new KeyValueTimestamp<>("A", 6, 8)),
- processors.get(1).processed());
- assertEquals(asList(
- new KeyValueTimestamp<>("A", null, 5),
- new KeyValueTimestamp<>("B", 2, 100),
- new KeyValueTimestamp<>("C", null, 0),
- new KeyValueTimestamp<>("D", 4, 0),
- new KeyValueTimestamp<>("A", null, 10),
- new KeyValueTimestamp<>("A", 6, 8)),
- processors.get(2).processed());
- assertEquals(asList(
- new KeyValueTimestamp<>("A", "01", 5),
- new KeyValueTimestamp<>("B", "02", 100),
- new KeyValueTimestamp<>("C", "03", 0),
- new KeyValueTimestamp<>("D", "04", 0),
- new KeyValueTimestamp<>("A", "05", 10),
- new KeyValueTimestamp<>("A", "06", 8)),
- processors.get(3).processed());
+
+ if (storeFormat.equals("default")) {
Review Comment:
We should have consts for `headers` and `default`
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java:
##########
@@ -208,90 +286,132 @@ public void testAggOfVersionedStore() {
try (
final TopologyTestDriver driver = new TopologyTestDriver(
- builder.build(), CONFIG, Instant.ofEpochMilli(0L))) {
+ builder.build(), props, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- inputTopic.pipeInput("A", "1", 10L);
- inputTopic.pipeInput("A", (String) null, 15L);
- inputTopic.pipeInput("A", "1", 12L); // out-of-order record will
be ignored
- inputTopic.pipeInput("B", "2", 20L);
- inputTopic.pipeInput("null", "3", 25L);
- inputTopic.pipeInput("B", "4", 23L);
- inputTopic.pipeInput("NULL", "5", 24L);
- inputTopic.pipeInput("B", "7", 22L); // out-of-order record will
be ignored
-
- assertEquals(
- asList(
- new KeyValueTimestamp<>("1", "0+1", 10),
- new KeyValueTimestamp<>("1", "0+1-1", 15),
- new KeyValueTimestamp<>("2", "0+2", 20),
- new KeyValueTimestamp<>("2", "0+2-2", 23),
- new KeyValueTimestamp<>("4", "0+4", 23)),
- supplier.theCapturedProcessor().processed());
+ final Headers headersA = makeHeaders("key", "A");
+ final Headers headersB = makeHeaders("key", "B");
+ final Headers headersNull = makeHeaders("key", "null");
+ final Headers headersNULL = makeHeaders("key", "NULL");
+
+ inputTopic.pipeInput(new TestRecord<>("A", "1", headersA, 10L));
+ inputTopic.pipeInput(new TestRecord<>("A", null, headersA, 15L));
+ inputTopic.pipeInput(new TestRecord<>("A", "1", headersA, 12L));
// out-of-order record will be ignored
+ inputTopic.pipeInput(new TestRecord<>("B", "2", headersB, 20L));
+ inputTopic.pipeInput(new TestRecord<>("null", "3", headersNull,
25L));
+ inputTopic.pipeInput(new TestRecord<>("B", "4", headersB, 23L));
+ inputTopic.pipeInput(new TestRecord<>("NULL", "5", headersNULL,
24L));
+ inputTopic.pipeInput(new TestRecord<>("B", "7", headersB, 22L));
// out-of-order record will be ignored
+
+ if (storeFormat.equals("default")) {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>("1", "0+1", 10),
+ new KeyValueTimestamp<>("1", "0+1-1", 15),
+ new KeyValueTimestamp<>("2", "0+2", 20),
+ new KeyValueTimestamp<>("2", "0+2-2", 23),
+ new KeyValueTimestamp<>("4", "0+4", 23)),
+ supplier.theCapturedProcessor().processed());
+ } else if (storeFormat.equals("headers")) {
+ assertEquals(
+ asList(
+ new KeyValueTimestampHeaders<>("1", "0+1", 10,
headersA),
+ new KeyValueTimestampHeaders<>("1", "0+1-1", 15,
headersA),
+ new KeyValueTimestampHeaders<>("2", "0+2", 20,
headersB),
+ new KeyValueTimestampHeaders<>("2", "0+2-2", 23,
headersB),
+ new KeyValueTimestampHeaders<>("4", "0+4", 23,
headersB)),
+ supplier.theCapturedProcessor().processedWithHeaders());
+ }
}
}
- private static void testCountHelper(final StreamsBuilder builder,
- final String input,
- final MockApiProcessorSupplier<String,
Object, Void, Void> supplier) {
+ private void testCountHelper(final String storeFormat,
+ final Properties props,
+ final boolean useMaterialized) {
+ final MockApiProcessorSupplier<String, Object, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String input = "count-test-input";
+
+ if (useMaterialized) {
+ builder
+ .table(input, consumed)
+ .groupBy(MockMapper.selectValueKeyValueMapper(),
stringSerialized)
+ .count(Materialized.as("count"))
+ .toStream()
+ .process(supplier);
+ } else {
+ builder
+ .table(input, consumed)
+ .groupBy(MockMapper.selectValueKeyValueMapper(),
stringSerialized)
+ .count()
+ .toStream()
+ .process(supplier);
+ }
try (
final TopologyTestDriver driver = new TopologyTestDriver(
- builder.build(), CONFIG, Instant.ofEpochMilli(0L))) {
+ builder.build(), props, Instant.ofEpochMilli(0L))) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(input, new StringSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- inputTopic.pipeInput("A", "green", 10L);
- inputTopic.pipeInput("B", "green", 9L);
- inputTopic.pipeInput("A", "blue", 12L);
- inputTopic.pipeInput("C", "yellow", 15L);
- inputTopic.pipeInput("D", "green", 11L);
-
- assertEquals(
- asList(
- new KeyValueTimestamp<>("green", 1L, 10),
- new KeyValueTimestamp<>("green", 2L, 10),
- new KeyValueTimestamp<>("green", 1L, 12),
- new KeyValueTimestamp<>("blue", 1L, 12),
- new KeyValueTimestamp<>("yellow", 1L, 15),
- new KeyValueTimestamp<>("green", 2L, 12)),
- supplier.theCapturedProcessor().processed());
+ final Headers headersA = makeHeaders("key", "A");
+ final Headers headersB = makeHeaders("key", "B");
+ final Headers headersC = makeHeaders("key", "C");
+ final Headers headersD = makeHeaders("key", "D");
+
+ inputTopic.pipeInput(new TestRecord<>("A", "green", headersA,
10L));
+ inputTopic.pipeInput(new TestRecord<>("B", "green", headersB, 9L));
+ inputTopic.pipeInput(new TestRecord<>("A", "blue", headersA, 12L));
+ inputTopic.pipeInput(new TestRecord<>("C", "yellow", headersC,
15L));
+ inputTopic.pipeInput(new TestRecord<>("D", "green", headersD,
11L));
+
+ if (storeFormat.equals("default")) {
+ assertEquals(
+ asList(
+ new KeyValueTimestamp<>("green", 1L, 10),
+ new KeyValueTimestamp<>("green", 2L, 10),
+ new KeyValueTimestamp<>("green", 1L, 12),
+ new KeyValueTimestamp<>("blue", 1L, 12),
+ new KeyValueTimestamp<>("yellow", 1L, 15),
+ new KeyValueTimestamp<>("green", 2L, 12)),
+ supplier.theCapturedProcessor().processed());
+ } else if (storeFormat.equals("headers")) {
+ assertEquals(
+ asList(
+ new KeyValueTimestampHeaders<>("green", 1L, 10,
headersA),
+ new KeyValueTimestampHeaders<>("green", 2L, 10,
headersB),
+ new KeyValueTimestampHeaders<>("green", 1L, 12,
headersA),
+ new KeyValueTimestampHeaders<>("blue", 1L, 12,
headersA),
+ new KeyValueTimestampHeaders<>("yellow", 1L, 15,
headersC),
+ new KeyValueTimestampHeaders<>("green", 2L, 12,
headersD)),
+ supplier.theCapturedProcessor().processedWithHeaders());
+ }
}
}
- @Test
- public void testCount() {
- final StreamsBuilder builder = new StreamsBuilder();
- final String input = "count-test-input";
-
- builder
- .table(input, consumed)
- .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
- .count(Materialized.as("count"))
- .toStream()
- .process(supplier);
-
- testCountHelper(builder, input, supplier);
+ @ParameterizedTest
+ @MethodSource("storeFormats")
+ public void testCount(final String storeFormat) {
+ final Properties props = getProps(storeFormat);
+ testCountHelper(storeFormat, props, true);
}
- @Test
- public void testCountWithInternalStore() {
- final StreamsBuilder builder = new StreamsBuilder();
- final String input = "count-test-input";
-
- builder
- .table(input, consumed)
- .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialized)
- .count()
- .toStream()
- .process(supplier);
-
- testCountHelper(builder, input, supplier);
+ @ParameterizedTest
+ @MethodSource("storeFormats")
+ public void testCountWithInternalStore(final String storeFormat) {
+ final Properties props = getProps(storeFormat);
+ testCountHelper(storeFormat, props, false);
}
- @Test
- public void testCountOfVersionedStore() {
+ /** Source table: VersionedKeyValueStore (explicitly materialized)
+ * Aggregate store: Created by .count() - will respect
DSL_STORE_FORMAT_CONFIG
+ */
Review Comment:
Do we need to keep this javadoc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]