mjsax commented on code in PR #11896: URL: https://github.com/apache/kafka/pull/11896#discussion_r845465254
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -80,22 +109,54 @@ public void enableSendingOldValues() { private TimestampedWindowStore<KIn, VAgg> windowStore; private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; + private Sensor emittedRecordsSensor; + private Sensor emitFinalLatencySensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP; + private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext; + private final TimeTracker timeTracker = new TimeTracker(); + private final Time time = Time.SYSTEM; @Override public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); - final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext = - (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; + internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); + emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), metrics); + emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), metrics); windowStore = context.getStateStore(storeName); - tupleForwarder = new TimestampedTupleForwarder<>( - windowStore, - context, - new TimestampedCacheFlushListener<>(context), - sendOldValues); + + if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { + // Don't set flush lister which emit cache results + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + sendOldValues); + } else { + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + log.info("EmitStrategy=" + emitStrategy.type()); Review Comment: Might debug level be better? ########## streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; +import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy; +import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy; + +/** + * This interface controls the strategy that can be used to control how we emit results in a processor. + */ +public interface EmitStrategy { + + enum StrategyType { + ON_WINDOW_CLOSE, + ON_WINDOW_UPDATE + } + + /** + * Returns the strategy type + * @return Emit strategy type + */ + StrategyType type(); + + /** + * This strategy indicates that the aggregated result for a window will only be outputted when the + * window closes instead of when there's an update to the window. + * + * <p>This strategy should only be used for window which can close. For example, it doesn't make sense + * to be used with {@link UnlimitedWindow} Review Comment: What happens if it's used anyway? (nit: missing `.` at the end of the sentence) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -80,22 +109,54 @@ public void enableSendingOldValues() { private TimestampedWindowStore<KIn, VAgg> windowStore; private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; + private Sensor emittedRecordsSensor; + private Sensor emitFinalLatencySensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP; + private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext; + private final TimeTracker timeTracker = new TimeTracker(); + private final Time time = Time.SYSTEM; @Override public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); - final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext = - (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; + internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); + emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), metrics); Review Comment: While useful, I cannot remember that we discussed to add metrics on the KIP. Metrics are public API and should be included in the KIP. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 + // Set lastEmitWindowStart to -1L if not set so that when we fetchAll, we fetch from 0L + final long lastEmitWindowStart = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? + -1L : lastEmitCloseTime - windows.size(); + + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { + final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitWindowStart); + final Map<Long, W> matchedEmitWindows = windows.windowsFor(lastEmitWindowStart); + + // Don't fetch store if the new emit window close time doesn't progress enough to cover next + // window + if (matchedCloseWindows.equals(matchedEmitWindows)) { + log.debug("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}", Review Comment: Better log this at `trace` level? ########## streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java: ########## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThrows; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class TimeWindowedKStreamIntegrationTest { + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, + mkProperties( + mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp + ) + ); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + + private StreamsBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String streamTwoInput; + private String outputTopic; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public StrategyType type; + + @Parameter(1) + public EmitStrategy emitStrategy; + + private boolean emitFinal; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getEmitStrategy() { + return asList(new Object[][] { + {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()}, + {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()} + }); + } + + @Before + public void before() throws InterruptedException { + builder = new StreamsBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); // Always process + streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.MAX_VALUE); // Don't expire changelog + + emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldAggregateWindowedWithNoGrace() throws Exception { + produceMessages( + streamOneInput, + new KeyValueTimestamp<>("A", "1", 0), + new KeyValueTimestamp<>("A", "1", 5), Review Comment: The output might be easier to read/understand if we use unique values, `1` to `7` for the seven input records. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java: ########## @@ -42,84 +47,142 @@ import org.junit.Before; import org.junit.Test; -import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import static java.time.Duration.ofMillis; import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +@RunWith(Parameterized.class) public class TimeWindowedKStreamImplTest { private static final String TOPIC = "input"; + private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1", new TimeWindow(0L, 500L)); + private static final Windowed<String> KEY_1_WINDOW_1 = new Windowed<>("1", new TimeWindow(500L, 1000L)); + private static final Windowed<String> KEY_2_WINDOW_1 = new Windowed<>("2", new TimeWindow(500L, 1000L)); + private static final Windowed<String> KEY_2_WINDOW_2 = new Windowed<>("2", new TimeWindow(1000L, 1500L)); + private final StreamsBuilder builder = new StreamsBuilder(); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); - private TimeWindowedKStream<String, String> windowedStream; + private TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream; + + @Parameter + public StrategyType type; + + @Parameter(1) + public EmitStrategy emitStrategy; + + private boolean emitFinal; + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getKeySchema() { + return asList(new Object[][] { + {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()}, + {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()} + }); + } + + @SuppressWarnings("unchecked") @Before public void before() { + emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE); + // Set interval to 0 so that it always tries to emit + props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, "0"); final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())); - windowedStream = stream. + // TODO: remove this cast https://issues.apache.org/jira/browse/KAFKA-13800 + windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) (stream. groupByKey(Grouped.with(Serdes.String(), Serdes.String())) - .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))); + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))); } @Test public void shouldCountWindowed() { final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> supplier = new MockApiProcessorSupplier<>(); windowedStream + .emitStrategy(emitStrategy) .count() .toStream() .process(supplier); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); } - assertThat( - supplier.theCapturedProcessor().lastValueAndTimestampPerKey() - .get(new Windowed<>("1", new TimeWindow(0L, 500L))), - equalTo(ValueAndTimestamp.make(2L, 15L))); - assertThat( - supplier.theCapturedProcessor().lastValueAndTimestampPerKey() - .get(new Windowed<>("2", new TimeWindow(500L, 1000L))), - equalTo(ValueAndTimestamp.make(2L, 550L))); - assertThat( - supplier.theCapturedProcessor().lastValueAndTimestampPerKey() - .get(new Windowed<>("1", new TimeWindow(500L, 1000L))), - equalTo(ValueAndTimestamp.make(1L, 500L))); + final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed = supplier.theCapturedProcessor().processed(); + + if (emitFinal) { + assertEquals( + asList( + new KeyValueTimestamp<>(KEY_1_WINDOW_0, 2L, 15L), + new KeyValueTimestamp<>(KEY_1_WINDOW_1, 1L, 500L), + new KeyValueTimestamp<>(KEY_2_WINDOW_1, 2L, 550L) Review Comment: Why does the order change? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 + // Set lastEmitWindowStart to -1L if not set so that when we fetchAll, we fetch from 0L + final long lastEmitWindowStart = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? + -1L : lastEmitCloseTime - windows.size(); + + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { + final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitWindowStart); + final Map<Long, W> matchedEmitWindows = windows.windowsFor(lastEmitWindowStart); + + // Don't fetch store if the new emit window close time doesn't progress enough to cover next + // window + if (matchedCloseWindows.equals(matchedEmitWindows)) { Review Comment: As we `.fetchAll(lastEmitWindowStart + 1, emitWindowStart);` and both upper and lower bound are inclusive, would we not need that `emitWindowStart >= lastEmitWindowStart+1` to have at least one window that we could potentially emit? Otherwise, lower bound is larger than upper bound and the time-range is not well defined? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { Review Comment: Would `lastEmitCloseTime` not always be `<= closeTime` ? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java: ########## @@ -202,12 +207,22 @@ return aggregateBuilder.build( new NamedInternal(reduceName), materialize(materializedInternal), - new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), + new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null, materializedInternal.valueSerde()); } + //@Override Review Comment: Should not be a comment? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -80,22 +109,54 @@ public void enableSendingOldValues() { private TimestampedWindowStore<KIn, VAgg> windowStore; private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; + private Sensor emittedRecordsSensor; + private Sensor emitFinalLatencySensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP; + private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext; + private final TimeTracker timeTracker = new TimeTracker(); + private final Time time = Time.SYSTEM; @Override public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> context) { super.init(context); - final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext = - (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; + internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context; final StreamsMetricsImpl metrics = internalProcessorContext.metrics(); final String threadId = Thread.currentThread().getName(); droppedRecordsSensor = droppedRecordsSensor(threadId, context.taskId().toString(), metrics); + emittedRecordsSensor = emittedRecordsSensor(threadId, context.taskId().toString(), metrics); + emitFinalLatencySensor = emitFinalLatencySensor(threadId, context.taskId().toString(), metrics); windowStore = context.getStateStore(storeName); - tupleForwarder = new TimestampedTupleForwarder<>( - windowStore, - context, - new TimestampedCacheFlushListener<>(context), - sendOldValues); + + if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { + // Don't set flush lister which emit cache results + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + sendOldValues); + } else { + tupleForwarder = new TimestampedTupleForwarder<>( + windowStore, + context, + new TimestampedCacheFlushListener<>(context), + sendOldValues); + } + + log.info("EmitStrategy=" + emitStrategy.type()); + // Restore last emit close time for ON_WINDOW_CLOSE strategy + if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) { + final Long lastEmitTime = internalProcessorContext.processorMetadataForKey(storeName); + if (lastEmitTime != null) { + lastEmitCloseTime = lastEmitTime; + } + final long emitInterval = StreamsConfig.InternalConfig.getLong( + context.appConfigs(), + EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, + 1000L + ); + timeTracker.setEmitInterval(emitInterval); + log.info("EmitInterval=" + emitInterval); Review Comment: maybe better to use debug level? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; Review Comment: Did we do the same thing for stream-stream join? I don't think we did? Might be worth to do there, too? (Not part of this PR...) ########## streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; +import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy; +import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy; + +/** + * This interface controls the strategy that can be used to control how we emit results in a processor. + */ +public interface EmitStrategy { + + enum StrategyType { + ON_WINDOW_CLOSE, + ON_WINDOW_UPDATE + } + + /** + * Returns the strategy type + * @return Emit strategy type + */ + StrategyType type(); + + /** + * This strategy indicates that the aggregated result for a window will only be outputted when the + * window closes instead of when there's an update to the window. Review Comment: Can we add details what "closes" means, ie, _when_ a window is closed? Should we also mention caching? Or would this go into the weeds? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 Review Comment: The comment make sense, but I am not sure what the point is you try to make? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 + // Set lastEmitWindowStart to -1L if not set so that when we fetchAll, we fetch from 0L + final long lastEmitWindowStart = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? + -1L : lastEmitCloseTime - windows.size(); + + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { + final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitWindowStart); + final Map<Long, W> matchedEmitWindows = windows.windowsFor(lastEmitWindowStart); + + // Don't fetch store if the new emit window close time doesn't progress enough to cover next + // window + if (matchedCloseWindows.equals(matchedEmitWindows)) { + log.debug("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}", + lastEmitCloseTime, closeTime); + return; + } + } + + final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit = windowStore + .fetchAll(lastEmitWindowStart + 1, emitWindowStart); + + int emittedCount = 0; + while (windowToEmit.hasNext()) { + emittedCount++; + final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next(); + tupleForwarder.maybeForward( + record.withKey(kv.key) + .withValue(new Change<>(kv.value.value(), null)) + .withTimestamp(kv.value.timestamp()) + .withHeaders(null)); // Don't set header Review Comment: Based on what we do elsewhere, I think we should set the header as `context.header()` -- I understand that it's semantically questionable, but we should be consistent with what we do elsewhere (and maybe clean it up in a consistent manner via a KIP at some point). ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 + // Set lastEmitWindowStart to -1L if not set so that when we fetchAll, we fetch from 0L + final long lastEmitWindowStart = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? + -1L : lastEmitCloseTime - windows.size(); + + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { + final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitWindowStart); + final Map<Long, W> matchedEmitWindows = windows.windowsFor(lastEmitWindowStart); + + // Don't fetch store if the new emit window close time doesn't progress enough to cover next + // window + if (matchedCloseWindows.equals(matchedEmitWindows)) { Review Comment: Is this the same as `emitWindowStart == lastEmitWindowStart`? If yes, comparing two longs seems to be more efficient than comparing to `Maps` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -80,22 +109,54 @@ public void enableSendingOldValues() { private TimestampedWindowStore<KIn, VAgg> windowStore; private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder; private Sensor droppedRecordsSensor; + private Sensor emittedRecordsSensor; + private Sensor emitFinalLatencySensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP; + private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext; + private final TimeTracker timeTracker = new TimeTracker(); + private final Time time = Time.SYSTEM; Review Comment: We should use `Time.System` but pass in a `Time` object from outside -- otherwise, we cannot mock it. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() Review Comment: nit: `emitWindowStart >= 0` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); Review Comment: It took we some time to figure out what this variable really is. It's the upper bound of the emit range (based on window-start) right? Maybe we can find a better name? Maybe `emitRangeUpperBoundInclusive` ? Similarly it might be helpful to introduce an `emitRangeLowerBoundInclusive` variable? ########## streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java: ########## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThrows; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class TimeWindowedKStreamIntegrationTest { + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Review Comment: Why do we test using `EmbeddedKafkaCluster`? Seems using `TopologyTestDriver` should be sufficient and would be less heavy? ########## streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java: ########## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThrows; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class TimeWindowedKStreamIntegrationTest { + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, + mkProperties( + mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp + ) + ); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + + private StreamsBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String streamTwoInput; + private String outputTopic; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public StrategyType type; + + @Parameter(1) + public EmitStrategy emitStrategy; + + private boolean emitFinal; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getEmitStrategy() { + return asList(new Object[][] { + {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()}, + {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()} + }); + } + + @Before + public void before() throws InterruptedException { + builder = new StreamsBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); // Always process + streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.MAX_VALUE); // Don't expire changelog + + emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldAggregateWindowedWithNoGrace() throws Exception { + produceMessages( + streamOneInput, + new KeyValueTimestamp<>("A", "1", 0), + new KeyValueTimestamp<>("A", "1", 5), + new KeyValueTimestamp<>("A", "1", 10), // close [0, 10) + new KeyValueTimestamp<>("B", "2", 6), // late and skip + new KeyValueTimestamp<>("B", "2", 11), // close [0, 10) Review Comment: Why would this close `[0,10)` ? It was already closed? ########## streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java: ########## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThrows; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class TimeWindowedKStreamIntegrationTest { + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, + mkProperties( + mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp + ) + ); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + + private StreamsBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String streamTwoInput; + private String outputTopic; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public StrategyType type; + + @Parameter(1) + public EmitStrategy emitStrategy; + + private boolean emitFinal; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getEmitStrategy() { + return asList(new Object[][] { + {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()}, + {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()} + }); + } + + @Before + public void before() throws InterruptedException { + builder = new StreamsBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); // Always process + streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.MAX_VALUE); // Don't expire changelog + + emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldAggregateWindowedWithNoGrace() throws Exception { + produceMessages( + streamOneInput, + new KeyValueTimestamp<>("A", "1", 0), + new KeyValueTimestamp<>("A", "1", 5), + new KeyValueTimestamp<>("A", "1", 10), // close [0, 10) + new KeyValueTimestamp<>("B", "2", 6), // late and skip Review Comment: Why would this record be skipped? You define a hopping window, and while window `[0,10)` is closed, window `[5,15)` is still open and the record should still go into this second window? ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java: ########## @@ -158,21 +235,23 @@ public void shouldMaterializeCount() { final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(windowStore.fetch("1", "2", ofEpochMilli(0), ofEpochMilli(1000L))); - assertThat(data, equalTo(Arrays.asList( + assertThat(data, equalTo(asList( KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 2L), KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 1000)), 1L), - KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L)))); + KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 1000)), 2L), + KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 1500)), 1L)))); Review Comment: Why do we get one more output record? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ########## @@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) { droppedRecordsSensor.record(); } } + + maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), time, emitFinalLatencySensor); + } + + private void tryEmitFinalResult(final Record<KIn, VIn> record, final long closeTime) { + if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { + return; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered everytime + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Close time does not progress + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { + return; + } + + final long emitWindowStart = closeTime - windows.size(); + if (emitWindowStart < 0) { + // If emitWindowStart is 0, it means first window closes since windowEndTime + // is exclusive + return; + } + + // Because we only get here when emitWindowStart > 0 which means closeTime > windows.size() + // Since we set lastEmitCloseTime to closeTime before storing to processor metadata + // lastEmitCloseTime - windows.size() is always > 0 + // Set lastEmitWindowStart to -1L if not set so that when we fetchAll, we fetch from 0L + final long lastEmitWindowStart = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? + -1L : lastEmitCloseTime - windows.size(); + + if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { + final Map<Long, W> matchedCloseWindows = windows.windowsFor(emitWindowStart); + final Map<Long, W> matchedEmitWindows = windows.windowsFor(lastEmitWindowStart); + + // Don't fetch store if the new emit window close time doesn't progress enough to cover next + // window + if (matchedCloseWindows.equals(matchedEmitWindows)) { + log.debug("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}", + lastEmitCloseTime, closeTime); + return; + } + } + + final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> windowToEmit = windowStore + .fetchAll(lastEmitWindowStart + 1, emitWindowStart); + + int emittedCount = 0; + while (windowToEmit.hasNext()) { + emittedCount++; + final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = windowToEmit.next(); + tupleForwarder.maybeForward( + record.withKey(kv.key) + .withValue(new Change<>(kv.value.value(), null)) + .withTimestamp(kv.value.timestamp()) + .withHeaders(null)); // Don't set header + } + emittedRecordsSensor.record(emittedCount); + + lastEmitCloseTime = closeTime; + internalProcessorContext.addProcessorMetadataKeyValue(storeName, closeTime); Review Comment: After considering the logic, I am wondering if it might be simpler to not store the latest "closeTime" but the latest window-start time for which we did emit (ie, `emitRangeLowerBoundInclusive`)? It's of course just shifted by window-size and grace-period but might make the code easier to structure and read/understand? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java: ########## @@ -232,11 +247,19 @@ ); break; case ROCKS_DB: - supplier = Stores.persistentTimestampedWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.size()), - false + supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( + materialized.storeName(), Review Comment: For what scenario should it happen that we have two stores with the same name? (I believe there is no problem and the code is fine, but maybe I miss something?) ########## streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java: ########## @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Collection; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockAggregator; +import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static java.time.Duration.ofMillis; +import static java.time.Instant.ofEpochMilli; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThrows; + +@SuppressWarnings({"unchecked"}) +@Category({IntegrationTest.class}) +@RunWith(Parameterized.class) +public class TimeWindowedKStreamIntegrationTest { + private static final int NUM_BROKERS = 1; + + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, + mkProperties( + mkMap(mkEntry("log.retention.hours", "-1"), mkEntry("log.retention.bytes", "-1")) // Don't expire records since we manipulate timestamp + ) + ); + + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + + + private StreamsBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String streamTwoInput; + private String outputTopic; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public StrategyType type; + + @Parameter(1) + public EmitStrategy emitStrategy; + + private boolean emitFinal; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> getEmitStrategy() { + return asList(new Object[][] { + {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()}, + {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()} + }); + } + + @Before + public void before() throws InterruptedException { + builder = new StreamsBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 0); // Always process + streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.MAX_VALUE); // Don't expire changelog + + emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + kafkaStreams.cleanUp(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldAggregateWindowedWithNoGrace() throws Exception { + produceMessages( + streamOneInput, + new KeyValueTimestamp<>("A", "1", 0), + new KeyValueTimestamp<>("A", "1", 5), + new KeyValueTimestamp<>("A", "1", 10), // close [0, 10) + new KeyValueTimestamp<>("B", "2", 6), // late and skip + new KeyValueTimestamp<>("B", "2", 11), // close [0, 10) + new KeyValueTimestamp<>("B", "2", 15), // close [5, 15) + new KeyValueTimestamp<>("C", "3", 25) // close [10, 20), [15, 25) + ); + + final Serde<Windowed<String>> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L); + // TODO: remove this cast https://issues.apache.org/jira/browse/KAFKA-13800 + final TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder + .stream(streamOneInput, Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10L)).advanceBy(ofMillis(5L))); + windowedStream.emitStrategy(emitStrategy) + .aggregate( + MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.with(null, new StringSerde()) + ) + .toStream() + .to(outputTopic, Produced.with(windowedSerde, new StringSerde())); + + startStreams(); + + final List<KeyValueTimestamp<Windowed<String>, String>> windowedMessages = receiveMessagesWithTimestamp( + new TimeWindowedDeserializer<>(new StringDeserializer(), 10L), + new StringDeserializer(), + 10L, + String.class, + emitFinal ? 6 : 12); + + final List<KeyValueTimestamp<Windowed<String>, String>> expectResult; + if (emitFinal) { + expectResult = asList( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+1", 5), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+1+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+2+2", 11), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+2+2", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+2", 15) + ); + } else { + expectResult = asList( + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1", 0), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 10L)), "0+1+1", 5), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+1", 5), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 15L)), "0+1+1", 10), + new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(10L, 20L)), "0+1", 10), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+2", 6), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 15L)), "0+2+2", 11), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+2", 11), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(10L, 20L)), "0+2+2", 15), + new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(15L, 25L)), "0+2", 15), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(20L, 30L)), "0+3", 25), + new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(25L, 35L)), "0+3", 25) + ); + } + + assertThat(windowedMessages, is(expectResult)); + } + + @Test + public void shouldAggregateWindowedWithGrace() throws Exception { + produceMessages( + streamOneInput, + new KeyValueTimestamp<>("A", "1", 0), + new KeyValueTimestamp<>("A", "1", 5), + new KeyValueTimestamp<>("A", "1", 10), // close [-5, 5) Review Comment: Does `[-5,5)` exists? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org