Repository: kafka Updated Branches: refs/heads/trunk 346d0ca53 -> d83252eba
KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to `KGroupedStream`. Refactor common parts between `KGroupedStream` and `WindowedKStream` Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3827 from dguy/kafka-5654 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83252eb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83252eb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83252eb Branch: refs/heads/trunk Commit: d83252ebaeeca5bf19584908d95b424beb31b12e Parents: 346d0ca Author: Damian Guy <damian....@gmail.com> Authored: Mon Sep 18 11:54:14 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Mon Sep 18 11:54:14 2017 +0100 ---------------------------------------------------------------------- .../kafka/streams/kstream/KGroupedStream.java | 210 ++++++++++++++++++- .../GroupedStreamAggregateBuilder.java | 76 +++++++ .../kstream/internals/KGroupedStreamImpl.java | 127 +++++++---- .../streams/kstream/internals/KStreamImpl.java | 25 +-- .../kstream/internals/MaterializedInternal.java | 13 +- .../kstream/internals/WindowedKStreamImpl.java | 57 ++--- .../internals/KGroupedStreamImplTest.java | 106 ++++++++++ 7 files changed, 515 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index f12c2b2..08916ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -146,6 +147,38 @@ public interface KGroupedStream<K, V> { KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier); /** + * Count the number of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + * <pre>{@code + * KafkaStreams streams = ... // counting words + * String queryableStoreName = "count-store"; // the queryableStoreName should be the name of the store as defined by the Materialized instance + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-word"; + * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized); + + /** * Count the number of records in this stream by the grouped key and the defined windows. * Records with {@code null} key or value are ignored. * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. @@ -395,7 +428,7 @@ public interface KGroupedStream<K, V> { * and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ @@ -452,12 +485,14 @@ public interface KGroupedStream<K, V> { * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param reducer a {@link Reducer} that computes a new aggregate result - * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprectated use {@link #reduce(Reducer, Materialized)} */ + @Deprecated KTable<K, V> reduce(final Reducer<V> reducer, final String queryableStoreName); @@ -507,15 +542,69 @@ public interface KGroupedStream<K, V> { * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprectated use {@link #reduce(Reducer, Materialized)} */ + @Deprecated KTable<K, V> reduce(final Reducer<V> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); /** + * Combine the value of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate (first argument) and the record's value (second argument): + * <pre>{@code + * // At the example of a Reducer<Long> + * new Reducer<Long>() { + * @Override + * public Long apply(Long aggValue, Long currValue) { + * return aggValue + currValue; + * } + * }</pre> + * <p> + * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or + * max. + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + * <pre>{@code + * KafkaStreams streams = ... // compute sum + * String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable<K, V> reduce(final Reducer<V> reducer, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); + + /** * Combine the number of records in this stream by the grouped key and the defined windows. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value @@ -678,7 +767,7 @@ public interface KGroupedStream<K, V> { * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param windows the specification of the aggregation {@link Windows} * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent @@ -743,7 +832,7 @@ public interface KGroupedStream<K, V> { * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}. @@ -778,7 +867,7 @@ public interface KGroupedStream<K, V> { * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window @@ -841,7 +930,7 @@ public interface KGroupedStream<K, V> { * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent @@ -905,7 +994,9 @@ public interface KGroupedStream<K, V> { * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, @@ -935,6 +1026,105 @@ public interface KGroupedStream<K, V> { * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. * <p> + * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * <pre>{@code + * KafkaStreams streams = ... // some aggregation on value type double + * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); + * String key = "some-key"; + * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) + * }</pre> + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @param <VR> the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + + + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like + * count (c.f. {@link #count(String)}). + * <p> + * The default value serde from config will be used for serializing the result. + * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}. + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> + * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param <VR> the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator); + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + * <p> + * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like + * count (c.f. {@link #count(String)}). + * <p> + * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + * <p> * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter @@ -950,7 +1140,9 @@ public interface KGroupedStream<K, V> { * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); @@ -999,7 +1191,9 @@ public interface KGroupedStream<K, V> { * @param <VR> the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java new file mode 100644 index 0000000..ef0cdfc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.state.StoreBuilder; + +import java.util.Collections; +import java.util.Set; + +class GroupedStreamAggregateBuilder<K, V> { + private final InternalStreamsBuilder builder; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + private final boolean repartitionRequired; + private final Set<String> sourceNodes; + private final String name; + + GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder, + final Serde<K> keySerde, + final Serde<V> valueSerde, + final boolean repartitionRequired, + final Set<String> sourceNodes, + final String name) { + + this.builder = builder; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.repartitionRequired = repartitionRequired; + this.sourceNodes = sourceNodes; + this.name = name; + } + + <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, + final String functionName, + final StoreBuilder storeBuilder, + final boolean isQueryable) { + final String aggFunctionName = builder.newName(functionName); + final String sourceName = repartitionIfRequired(storeBuilder.name()); + builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName); + builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName); + + return new KTableImpl<>( + builder, + aggFunctionName, + aggregateSupplier, + sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName), + storeBuilder.name(), + isQueryable); + } + + /** + * @return the new sourceName if repartitioned. Otherwise the name of this stream + */ + private String repartitionIfRequired(final String queryableStoreName) { + if (!repartitionRequired) { + return this.name; + } + return KStreamImpl.createReparitionedSource(builder, keySerde, valueSerde, queryableStoreName, name); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 57114b5..1fab2c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindows; @@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.WindowStore; import java.util.Collections; @@ -46,6 +49,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre private final Serde<K> keySerde; private final Serde<V> valSerde; private final boolean repartitionRequired; + private final Initializer<Long> countInitializer = new Initializer<Long>() { + @Override + public Long apply() { + return 0L; + } + }; + private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1; + } + }; + private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; private boolean isQueryable = true; KGroupedStreamImpl(final InternalStreamsBuilder builder, @@ -55,6 +71,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre final Serde<V> valSerde, final boolean repartitionRequired) { super(builder, name, sourceNodes); + this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, + keySerde, + valSerde, + repartitionRequired, + sourceNodes, + name); this.keySerde = keySerde; this.valSerde = valSerde; this.repartitionRequired = repartitionRequired; @@ -91,6 +113,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre storeSupplier); } + @Override + public KTable<K, V> reduce(final Reducer<V> reducer, + final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(reducer, "reducer can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized); + return doAggregate( + new KStreamReduce<K, V>(materializedInternal.storeName(), reducer), + REDUCE_NAME, + materializedInternal); + } + @Override public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, @@ -131,6 +166,41 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } @Override + public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(initializer, "initializer can't be null"); + Objects.requireNonNull(aggregator, "aggregator can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return aggregateMaterialized(initializer, aggregator, materialized); + } + + private <VR> KTable<K, VR> aggregateMaterialized(final Initializer<VR> initializer, + final Aggregator<? super K, ? super V, VR> aggregator, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal + = new MaterializedInternal<>(materialized); + return doAggregate( + new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), + AGGREGATE_NAME, + materializedInternal); + } + + @Override + public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) { + Objects.requireNonNull(initializer, "initializer can't be null"); + Objects.requireNonNull(aggregator, "aggregator can't be null"); + final String storeName = builder.newStoreName(AGGREGATE_NAME); + + MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal = + new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false); + return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), + AGGREGATE_NAME, + materializedInternal); + + } + + @Override public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde) { @@ -198,17 +268,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre @Override public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) { - return aggregate(new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }, new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1; - } - }, storeSupplier); + return aggregate(countInitializer, countAggregator, storeSupplier); + } + + @Override + public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) { + return aggregate(countInitializer, countAggregator, materialized); } @Override @@ -227,17 +292,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier) { return aggregate( - new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }, new Aggregator<K, V, Long>() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1; - } - }, + countInitializer, + countAggregator, windows, storeSupplier); } @@ -320,18 +376,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre final StateStoreSupplier<SessionStore> storeSupplier) { Objects.requireNonNull(sessionWindows, "sessionWindows can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); - final Initializer<Long> initializer = new Initializer<Long>() { - @Override - public Long apply() { - return 0L; - } - }; - final Aggregator<K, V, Long> aggregator = new Aggregator<K, V, Long>() { - @Override - public Long apply(final K aggKey, final V value, final Long aggregate) { - return aggregate + 1; - } - }; final Merger<K, Long> sessionMerger = new Merger<K, Long>() { @Override public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { @@ -339,7 +383,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre } }; - return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier); + return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier); } @@ -397,6 +441,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier); } + + private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, + final String functionName, + final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { + + final StoreBuilder<KeyValueStore<K, T>> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal) + .materialize(); + return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable()); + + } + private <T> KTable<K, T> doAggregate( final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier, final String functionName, @@ -426,6 +481,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre if (!repartitionRequired) { return this.name; } - return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName); + return KStreamImpl.createReparitionedSource(builder, keySerde, valSerde, queryableStoreName, name); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7201a00..6ebbd14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -605,37 +605,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V */ private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde, final Serde<V> valSerde) { - String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null); + String repartitionedSourceName = createReparitionedSource(builder, keySerde, valSerde, null, name); return new KStreamImpl<>(builder, repartitionedSourceName, Collections .singleton(repartitionedSourceName), false); } - static <K1, V1> String createReparitionedSource(final AbstractStream<K1> stream, + static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder, final Serde<K1> keySerde, final Serde<V1> valSerde, - final String topicNamePrefix) { + final String topicNamePrefix, + final String name) { Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null; Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null; Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null; Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null; - String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name; + String baseName = topicNamePrefix != null ? topicNamePrefix : name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; - String sinkName = stream.builder.newName(SINK_NAME); - String filterName = stream.builder.newName(FILTER_NAME); - String sourceName = stream.builder.newName(SOURCE_NAME); + String sinkName = builder.newName(SINK_NAME); + String filterName = builder.newName(FILTER_NAME); + String sourceName = builder.newName(SOURCE_NAME); - stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic); - stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { + builder.internalTopologyBuilder.addInternalTopic(repartitionTopic); + builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { @Override public boolean test(final K1 key, final V1 value) { return key != null; } - }, false), stream.name); + }, false), name); - stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, + builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, null, filterName); - stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), + builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, valDeserializer, repartitionTopic); return sourceName; http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index d7ebc65..0ee610f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -25,8 +25,15 @@ import java.util.Map; public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> { + private final boolean queryable; + public MaterializedInternal(final Materialized<K, V, S> materialized) { + this(materialized, true); + } + + MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) { super(materialized); + this.queryable = queryable; } public String storeName() { @@ -56,7 +63,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ return topicConfig; } - public boolean cachingEnabled() { + boolean cachingEnabled() { return cachingEnabled; } + + public boolean isQueryable() { + return queryable; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java index 28666b8..b6e38f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> { private final Windows<W> windows; - private final boolean repartitionRequired; private final Serde<K> keySerde; private final Serde<V> valSerde; + private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; WindowedKStreamImpl(final Windows<W> windows, final InternalStreamsBuilder builder, @@ -55,8 +54,8 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< Objects.requireNonNull(windows, "windows can't be null"); this.valSerde = valSerde; this.keySerde = keySerde; - this.repartitionRequired = repartitionRequired; this.windows = windows; + this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name); } @Override @@ -76,38 +75,34 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< Serdes.Long()); } + @SuppressWarnings("unchecked") @Override public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); - final String aggFunctionName = builder.newName(AGGREGATE_NAME); final String storeName = builder.newStoreName(AGGREGATE_NAME); - return doAggregate(aggValueSerde, - aggFunctionName, - storeName, - new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator)); + return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator), + AGGREGATE_NAME, + windowStoreBuilder(storeName, aggValueSerde), + false); } + @SuppressWarnings("unchecked") @Override public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) { Objects.requireNonNull(reducer, "reducer can't be null"); final String storeName = builder.newStoreName(REDUCE_NAME); - return doAggregate(valSerde, - builder.newName(REDUCE_NAME), - storeName, - new KStreamWindowReduce<>(windows, storeName, reducer)); + return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer), + REDUCE_NAME, + windowStoreBuilder(storeName, valSerde), + false); } - @SuppressWarnings("unchecked") - private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR> aggValueSerde, - final String aggFunctionName, - final String storeName, - final KStreamAggProcessorSupplier aggSupplier) { - final String sourceName = repartitionIfRequired(storeName); - final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder( + private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) { + return Stores.windowStoreBuilder( Stores.persistentWindowStore( storeName, windows.maintainMs(), @@ -115,29 +110,7 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream< windows.size(), false), keySerde, - aggValueSerde) - .withCachingEnabled(); - - builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName); - builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName); - - return new KTableImpl<>( - builder, - aggFunctionName, - aggSupplier, - sourceName.equals(this.name) ? sourceNodes - : Collections.singleton(sourceName), - storeName, - false); + aggValueSerde).withCachingEnabled(); } - /** - * @return the new sourceName if repartitioned. Otherwise the name of this stream - */ - private String repartitionIfRequired(final String queryableStoreName) { - if (!repartitionRequired) { - return this.name; - } - return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index bc65e09..efa027c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; @@ -486,6 +488,110 @@ public class KGroupedStreamImplTest { groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null); } + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() { + groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() { + groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() { + groupedStream.count((Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCountAndMaterializeResults() { + groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())); + + processData(); + + final KeyValueStore<String, Long> count = (KeyValueStore<String, Long>) driver.allStateStores().get("count"); + + assertThat(count.get("1"), equalTo(3L)); + assertThat(count.get("2"), equalTo(1L)); + assertThat(count.get("3"), equalTo(2L)); + } + + + + @SuppressWarnings("unchecked") + @Test + public void shouldReduceAndMaterializeResults() { + groupedStream.reduce(MockReducer.STRING_ADDER, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())); + + processData(); + + final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce"); + + assertThat(reduced.get("1"), equalTo("A+C+D")); + assertThat(reduced.get("2"), equalTo("B")); + assertThat(reduced.get("3"), equalTo("E+F")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAggregateAndMaterializeResults() { + groupedStream.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())); + + processData(); + + final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate"); + + assertThat(aggregate.get("1"), equalTo("0+A+C+D")); + assertThat(aggregate.get("2"), equalTo("0+B")); + assertThat(aggregate.get("3"), equalTo("0+E+F")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAggregateWithDefaultSerdes() { + final Map<String, String> results = new HashMap<>(); + groupedStream.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER) + .toStream() + .foreach(new ForeachAction<String, String>() { + @Override + public void apply(final String key, final String value) { + results.put(key, value); + } + }); + + processData(); + + assertThat(results.get("1"), equalTo("0+A+C+D")); + assertThat(results.get("2"), equalTo("0+B")); + assertThat(results.get("3"), equalTo("0+E+F")); + } + + private void processData() { + driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0); + driver.setTime(0); + driver.process(TOPIC, "1", "A"); + driver.process(TOPIC, "2", "B"); + driver.process(TOPIC, "1", "C"); + driver.process(TOPIC, "1", "D"); + driver.process(TOPIC, "3", "E"); + driver.process(TOPIC, "3", "F"); + driver.flushState(); + } + private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) { driver.setUp(builder, TestUtils.tempDirectory(), 0); driver.setTime(0);