This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 5d17ae1 KAFKA-6398: Return value getter based on KTable materialization status 5d17ae1 is described below commit 5d17ae182806dd2d15c532c31f7127673eeaa9a2 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Jan 18 13:12:44 2018 -0800 KAFKA-6398: Return value getter based on KTable materialization status This is a bug fix that is composed of two parts: 1. The major part is, for all operators that is generating a KTable, we should construct its value getter based on whether the KTable itself is materialized. 1.a If yes, then query the materialized store directly for value getter. 1.b If not, then hand over to its parents value getter (recursively) and apply the computation to return. 2. The minor part is, in KStreamImpl, when joining with a table, we should connect with table's `valueGetterSupplier().storeNames()`, not the `internalStoreName()` as the latter always assume that the KTable is materialized, but that is not always true. Author: Guozhang Wang <wangg...@gmail.com> Reviewers: Damian Guy <damian....@gmail.com>, Bill Bejeck <b...@confluent.io>, Matthias J. Sax <matth...@confluent.io> Closes #4421 from guozhangwang/K6398-KTableValueGetter (cherry picked from commit 75e37d7e2063b726aa0185f4b695f32cde69d95c) Signed-off-by: Guozhang Wang <wangg...@gmail.com> --- .../streams/kstream/internals/KStreamImpl.java | 2 +- .../streams/kstream/internals/KTableAggregate.java | 37 ++------- .../streams/kstream/internals/KTableFilter.java | 55 +++++++------- .../streams/kstream/internals/KTableImpl.java | 8 +- .../internals/KTableKTableAbstractJoin.java | 13 ++-- ...ableKTableAbstractJoinValueGetterSupplier.java} | 10 +-- .../kstream/internals/KTableKTableJoin.java | 6 +- .../kstream/internals/KTableKTableJoinMerger.java | 47 +++++++++--- .../kstream/internals/KTableKTableLeftJoin.java | 6 +- .../kstream/internals/KTableKTableOuterJoin.java | 6 +- .../kstream/internals/KTableKTableRightJoin.java | 6 +- .../streams/kstream/internals/KTableMapValues.java | 46 ++++++------ .../KTableMaterializedValueGetterSupplier.java | 54 ++++++++++++++ .../streams/kstream/internals/KTableReduce.java | 34 +-------- .../kstream/internals/KTableRepartitionMap.java | 8 +- .../processor/internals/ProcessorTopology.java | 10 +++ .../apache/kafka/streams/StreamsBuilderTest.java | 87 +++++++++++++++++++++- .../kstream/internals/KTableFilterTest.java | 3 + .../kstream/internals/KTableKTableJoinTest.java | 2 - 19 files changed, 281 insertions(+), 159 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 8e80315..0e39a30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -769,7 +769,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), joiner, leftJoin), this.name); - builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).internalStoreName()); + builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames()); builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name); return new KStreamImpl<>(builder, name, allSourceNodes, false); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index 973de0f..10765ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -33,7 +33,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T private boolean sendOldValues = false; - public KTableAggregate(String storeName, Initializer<T> initializer, Aggregator<? super K, ? super V, T> add, Aggregator<? super K, ? super V, T> remove) { + KTableAggregate(final String storeName, + final Initializer<T> initializer, + final Aggregator<? super K, ? super V, T> add, + final Aggregator<? super K, ? super V, T> remove) { this.storeName = storeName; this.initializer = initializer; this.add = add; @@ -51,7 +54,6 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T } private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, T> store; private TupleForwarder<K, T> tupleForwarder; @@ -98,35 +100,6 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T @Override public KTableValueGetterSupplier<K, T> view() { - - return new KTableValueGetterSupplier<K, T>() { - - public KTableValueGetter<K, T> get() { - return new KTableAggregateValueGetter(); - } - - @Override - public String[] storeNames() { - return new String[]{storeName}; - } - }; - } - - private class KTableAggregateValueGetter implements KTableValueGetter<K, T> { - - private KeyValueStore<K, T> store; - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - store = (KeyValueStore<K, T>) context.getStateStore(storeName); - } - - @Override - public T get(K key) { - return store.get(key); - } - + return new KTableMaterializedValueGetterSupplier<>(storeName); } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index af8c906..ee8982b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -30,8 +30,10 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { private final String queryableName; private boolean sendOldValues = false; - public KTableFilter(final KTableImpl<K, ?, V> parent, final Predicate<? super K, ? super V> predicate, - final boolean filterNot, final String queryableName) { + KTableFilter(final KTableImpl<K, ?, V> parent, + final Predicate<? super K, ? super V> predicate, + final boolean filterNot, + final String queryableName) { this.parent = parent; this.predicate = predicate; this.filterNot = filterNot; @@ -44,24 +46,6 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } @Override - public KTableValueGetterSupplier<K, V> view() { - - final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); - - return new KTableValueGetterSupplier<K, V>() { - - public KTableValueGetter<K, V> get() { - return new KTableFilterValueGetter(parentValueGetterSupplier.get()); - } - - @Override - public String[] storeNames() { - return parentValueGetterSupplier.storeNames(); - } - }; - } - - @Override public void enableSendingOldValues() { parent.enableSendingOldValues(); sendOldValues = true; @@ -108,24 +92,45 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { } - private class KTableFilterValueGetter implements KTableValueGetter<K, V> { + @Override + public KTableValueGetterSupplier<K, V> view() { + // if the KTable is materialized, use the materialized store to return getter value; + // otherwise rely on the parent getter and apply filter on-the-fly + if (queryableName != null) { + return new KTableMaterializedValueGetterSupplier<>(queryableName); + } else { + return new KTableValueGetterSupplier<K, V>() { + final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); + + public KTableValueGetter<K, V> get() { + return new KTableFilterValueGetter(parentValueGetterSupplier.get()); + } + + @Override + public String[] storeNames() { + return parentValueGetterSupplier.storeNames(); + } + }; + } + } + private class KTableFilterValueGetter implements KTableValueGetter<K, V> { private final KTableValueGetter<K, V> parentGetter; - public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) { + KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) { this.parentGetter = parentGetter; } + @SuppressWarnings("unchecked") @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { parentGetter.init(context); } @Override - public V get(K key) { + public V get(final K key) { return computeValue(key, parentGetter.get(key)); } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 3bc6f4b..e465302 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -137,10 +137,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return this.queryableStoreName; } - String internalStoreName() { - return this.queryableStoreName; - } - @SuppressWarnings("deprecation") private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier, @@ -760,9 +756,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, } final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.internalStoreName(), false), + new KTableImpl<K, V, R>(builder, joinThisName, joinThis, sourceNodes, this.queryableStoreName, false), new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, - ((KTableImpl<K, ?, ?>) other).internalStoreName(), false), + ((KTableImpl<K, ?, ?>) other).queryableStoreName, false), internalQueryableName ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java index 7fa39d9..bdc1dca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java @@ -20,13 +20,13 @@ import org.apache.kafka.streams.kstream.ValueJoiner; abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessorSupplier<K, V1, R> { - protected final KTableImpl<K, ?, V1> table1; - protected final KTableImpl<K, ?, V2> table2; - protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1; - protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2; - protected final ValueJoiner<? super V1, ? super V2, ? extends R> joiner; + private final KTableImpl<K, ?, V1> table1; + private final KTableImpl<K, ?, V2> table2; + final KTableValueGetterSupplier<K, V1> valueGetterSupplier1; + final KTableValueGetterSupplier<K, V2> valueGetterSupplier2; + final ValueJoiner<? super V1, ? super V2, ? extends R> joiner; - protected boolean sendOldValues = false; + boolean sendOldValues = false; KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, @@ -44,5 +44,4 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements KTableProcessor table2.enableSendingOldValues(); sendOldValues = true; } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java similarity index 78% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java index 74c0632..be5a202 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java @@ -18,12 +18,12 @@ package org.apache.kafka.streams.kstream.internals; import java.util.ArrayList; -public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> { - final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1; - final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2; +public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> { + final KTableValueGetterSupplier<K, V1> valueGetterSupplier1; + final KTableValueGetterSupplier<K, V2> valueGetterSupplier2; - public AbstractKTableKTableJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1, - final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + KTableKTableAbstractJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1, + final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { this.valueGetterSupplier1 = valueGetterSupplier1; this.valueGetterSupplier2 = valueGetterSupplier2; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index 1b26a5b..c424f4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -42,12 +42,12 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { + private class KTableKTableAbstractJoinValueGetterSupplier extends org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { - public KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + public KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index 82d9c26..d27b8bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -28,9 +28,9 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { private final String queryableName; private boolean sendOldValues = false; - public KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1, - final KTableImpl<K, ?, V> parent2, - final String queryableName) { + KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1, + final KTableImpl<K, ?, V> parent2, + final String queryableName) { this.parent1 = parent1; this.parent2 = parent2; this.queryableName = queryableName; @@ -38,12 +38,42 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { @Override public Processor<K, Change<V>> get() { - return new KTableKTableJoinMergeProcessor<>(); + return new KTableKTableJoinMergeProcessor(); } @Override public KTableValueGetterSupplier<K, V> view() { - return parent1.valueGetterSupplier(); + // if the result KTable is materialized, use the materialized store to return getter value; + // otherwise rely on the parent getter and apply join on-the-fly + if (queryableName != null) { + return new KTableMaterializedValueGetterSupplier<>(queryableName); + } else { + return new KTableValueGetterSupplier<K, V>() { + + public KTableValueGetter<K, V> get() { + return parent1.valueGetterSupplier().get(); + } + + @Override + public String[] storeNames() { + // we need to allow the downstream processor to be able to access both ends of the joining table's value getters + final String[] storeNames1 = parent1.valueGetterSupplier().storeNames(); + final String[] storeNames2 = parent2.valueGetterSupplier().storeNames(); + + final String[] stores = new String[storeNames1.length + storeNames2.length]; + int i = 0; + for (final String storeName : storeNames1) { + stores[i] = storeName; + i++; + } + for (final String storeName : storeNames2) { + stores[i] = storeName; + i++; + } + return stores; + } + }; + } } @Override @@ -53,14 +83,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { sendOldValues = true; } - private class KTableKTableJoinMergeProcessor<K, V> - extends AbstractProcessor<K, Change<V>> { + private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, Change<V>> { private KeyValueStore<K, V> store; private TupleForwarder<K, V> tupleForwarder; @SuppressWarnings("unchecked") @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); @@ -72,7 +101,6 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { @Override public void process(K key, Change<V> value) { - if (queryableName != null) { store.put(key, value.newValue); tupleForwarder.maybeForward(key, value.newValue, value.oldValue); @@ -81,5 +109,4 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { } } } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index c308a0d..33aef02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableLeftJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { + private class KTableKTableLeftAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { - public KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + public KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index 9cee4f3..d2e1d79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableOuterJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { + private class KTableKTableOuterAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { - public KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + public KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index b43efaa..f4c840b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, @Override public KTableValueGetterSupplier<K, R> view() { - return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); + return new KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2); } - private class KTableKTableRightJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> { + private class KTableKTableRightAbstractJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> { - public KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { + public KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) { super(valueGetterSupplier1, valueGetterSupplier2); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index 41dd7cd..9dfbd1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -30,8 +30,9 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { private final String queryableName; private boolean sendOldValues = false; - public KTableMapValues(final KTableImpl<K, ?, V> parent, final ValueMapper<? super V, ? extends V1> mapper, - final String queryableName) { + KTableMapValues(final KTableImpl<K, ?, V> parent, + final ValueMapper<? super V, ? extends V1> mapper, + final String queryableName) { this.parent = parent; this.mapper = mapper; this.queryableName = queryableName; @@ -44,19 +45,24 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { @Override public KTableValueGetterSupplier<K, V1> view() { - final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); - - return new KTableValueGetterSupplier<K, V1>() { - - public KTableValueGetter<K, V1> get() { - return new KTableMapValuesValueGetter(parentValueGetterSupplier.get()); - } - - @Override - public String[] storeNames() { - return parentValueGetterSupplier.storeNames(); - } - }; + // if the KTable is materialized, use the materialized store to return getter value; + // otherwise rely on the parent getter and apply map-values on-the-fly + if (queryableName != null) { + return new KTableMaterializedValueGetterSupplier<>(queryableName); + } else { + return new KTableValueGetterSupplier<K, V1>() { + final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier(); + + public KTableValueGetter<K, V1> get() { + return new KTableMapValuesValueGetter(parentValueGetterSupplier.get()); + } + + @Override + public String[] storeNames() { + return parentValueGetterSupplier.storeNames(); + } + }; + } } @Override @@ -75,13 +81,12 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { } private class KTableMapValuesProcessor extends AbstractProcessor<K, Change<V>> { - private KeyValueStore<K, V1> store; private TupleForwarder<K, V1> tupleForwarder; @SuppressWarnings("unchecked") @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); @@ -107,20 +112,19 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { private final KTableValueGetter<K, V> parentGetter; - public KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) { + KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) { this.parentGetter = parentGetter; } @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { parentGetter.init(context); } @Override - public V1 get(K key) { + public V1 get(final K key) { return computeValue(parentGetter.get(key)); } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java new file mode 100644 index 0000000..4ceccce --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; + +public class KTableMaterializedValueGetterSupplier<K, V> implements KTableValueGetterSupplier<K, V> { + + private final String storeName; + + KTableMaterializedValueGetterSupplier(final String storeName) { + this.storeName = storeName; + } + + public KTableValueGetter<K, V> get() { + return new KTableMaterializedValueGetter(); + } + + @Override + public String[] storeNames() { + return new String[]{storeName}; + } + + private class KTableMaterializedValueGetter implements KTableValueGetter<K, V> { + private KeyValueStore<K, V> store; + + @SuppressWarnings("unchecked") + @Override + public void init(final ProcessorContext context) { + store = (KeyValueStore<K, V>) context.getStateStore(storeName); + } + + @Override + public V get(final K key) { + return store.get(key); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index 744cc9c..b595bc3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -31,7 +31,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { private boolean sendOldValues = false; - public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer) { + KTableReduce(final String storeName, final Reducer<V> addReducer, final Reducer<V> removeReducer) { this.storeName = storeName; this.addReducer = addReducer; this.removeReducer = removeReducer; @@ -54,7 +54,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @SuppressWarnings("unchecked") @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); tupleForwarder = new TupleForwarder<K, V>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); @@ -94,34 +94,6 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { @Override public KTableValueGetterSupplier<K, V> view() { - - return new KTableValueGetterSupplier<K, V>() { - - public KTableValueGetter<K, V> get() { - return new KTableAggregateValueGetter(); - } - - @Override - public String[] storeNames() { - return new String[]{storeName}; - } - }; - } - - private class KTableAggregateValueGetter implements KTableValueGetter<K, V> { - - private KeyValueStore<K, V> store; - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - store = (KeyValueStore<K, V>) context.getStateStore(storeName); - } - - @Override - public V get(K key) { - return store.get(key); - } - + return new KTableMaterializedValueGetterSupplier<K, V>(storeName); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 09714a7..5aa3f2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -33,7 +33,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli private final KTableImpl<K, ?, V> parent; private final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper; - public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper) { + KTableRepartitionMap(final KTableImpl<K, ?, V> parent, final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper) { this.parent = parent; this.mapper = mapper; } @@ -101,17 +101,17 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli private final KTableValueGetter<K, V> parentGetter; - public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) { + KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) { this.parentGetter = parentGetter; } @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { parentGetter.init(context); } @Override - public KeyValue<K1, V1> get(K key) { + public KeyValue<K1, V1> get(final K key) { return mapper.apply(key, parentGetter.get(key)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 8a29786..8fbeb25 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -132,4 +132,14 @@ public class ProcessorTopology { return sb.toString(); } + // for testing only + public Set<String> processorConnectedStateStores(final String processorName) { + for (final ProcessorNode<?, ?> node : processorNodes) { + if (node.name().equals(processorName)) { + return node.stateStores; + } + } + + return Collections.emptySet(); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 13b5b45..4a496b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockMapper; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -69,6 +71,27 @@ public class StreamsBuilderTest { builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(1)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); + assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(), is(true)); + } + + @Test + public void shouldAllowJoinMaterializedFilteredKTable() { + final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic") + .filter(MockPredicate.<Bytes, String>allGoodPredicate(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")); + builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(2)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store"))); + assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), equalTo(Collections.singleton("store"))); } @Test @@ -77,14 +100,72 @@ public class StreamsBuilderTest { builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(1)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); + assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(), is(true)); + } + + @Test + public void shouldAllowJoinMaterializedMapValuedKTable() { + final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic") + .mapValues(MockMapper.<String>noOpValueMapper(), Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")); + builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(2)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), equalTo(Collections.singleton("store"))); + assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), equalTo(Collections.singleton("store"))); + } + + @Test + public void shouldAllowJoinUnmaterializedJoinedKTable() { + final KTable<Bytes, String> table1 = builder.table("table-topic1"); + final KTable<Bytes, String> table2 = builder.table("table-topic2"); + builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(2)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Utils.mkSet(topology.stateStores().get(0).name(), topology.stateStores().get(1).name()))); + assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(), is(true)); + + } + + @Test + public void shouldAllowJoinMaterializedJoinedKTable() { + final KTable<Bytes, String> table1 = builder.table("table-topic1"); + final KTable<Bytes, String> table2 = builder.table("table-topic2"); + builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(3)); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), equalTo(Collections.singleton("store"))); + assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), equalTo(Collections.singleton("store"))); } @Test public void shouldAllowJoinMaterializedSourceKTable() { - final KTable<Bytes, String> table = builder.<Bytes, String>table("table-topic"); + final KTable<Bytes, String> table = builder.table("table-topic"); builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER); driver.setUp(builder, TestUtils.tempDirectory()); + + ProcessorTopology topology = builder.internalTopologyBuilder.build(); + + assertThat(topology.stateStores().size(), equalTo(1)); + assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); + assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), equalTo(Collections.singleton(topology.stateStores().get(0).name()))); } @Test @@ -168,7 +249,7 @@ public class StreamsBuilderTest { driver.process(topic, 1L, "value1"); driver.process(topic, 2L, "value2"); driver.flushState(); - final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store"); + final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store"); assertThat(store.get(1L), equalTo("value1")); assertThat(store.get(2L), equalTo("value2")); assertThat(results.get(1L), equalTo("value1")); @@ -186,7 +267,7 @@ public class StreamsBuilderTest { driver.process(topic, 1L, "value1"); driver.process(topic, 2L, "value2"); driver.flushState(); - final KeyValueStore<Long, String> store = (KeyValueStore) driver.allStateStores().get("store"); + final KeyValueStore<Long, String> store = (KeyValueStore<Long, String>) driver.allStateStores().get("store"); assertThat(store.get(1L), equalTo("value1")); assertThat(store.get(2L), equalTo("value2")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index d70d8b7..01236ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -177,6 +177,7 @@ public class KTableFilterTest { driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); + driver.flushState(); assertEquals(2, (int) getter2.get("A")); assertEquals(2, (int) getter2.get("B")); @@ -187,6 +188,7 @@ public class KTableFilterTest { assertEquals(1, (int) getter3.get("C")); driver.process(topic1, "A", 3); + driver.flushState(); assertNull(getter2.get("A")); assertEquals(2, (int) getter2.get("B")); @@ -198,6 +200,7 @@ public class KTableFilterTest { driver.process(topic1, "A", null); driver.process(topic1, "B", null); + driver.flushState(); assertNull(getter2.get("A")); assertNull(getter2.get("B")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index aeb2418..09d4aa0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -46,8 +46,6 @@ public class KTableKTableJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; - final private String storeName1 = "store-name-1"; - final private String storeName2 = "store-name-2"; final private Serde<Integer> intSerde = Serdes.Integer(); final private Serde<String> stringSerde = Serdes.String(); -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].