This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ab156fd KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener (#6017) ab156fd is described below commit ab156fded1548b6cf06aff56bb5193c37a8f7e51 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Sun Dec 9 15:33:17 2018 -0800 KAFKA-6036: Follow-up; cleanup sendOldValues logic ForwardingCacheFlushListener (#6017) This is a follow-up PR from the previous PR #5779, where KTabeSource always get old values from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX processor to push the sendOldValues at the callers in order to avoid unnecessary store reads. More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues as parameters. a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore already use the sendOldValues values passed from TupleForwarder to avoid getting old values from underlying stores. b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store; and then it does not need to keep it as its own variable since the cached store already respects the boolean to pass null or the actual value.. The only other minor bug I found from the pass in on KTableJoinMerge, where we always pass old values and ignores sendOldValues. Reviewers: Matthias J. Sax <mj...@apache.org> --- .../kstream/internals/ForwardingCacheFlushListener.java | 10 ++-------- .../kafka/streams/kstream/internals/KStreamAggregate.java | 6 +++--- .../apache/kafka/streams/kstream/internals/KStreamReduce.java | 4 ++-- .../kstream/internals/KStreamSessionWindowAggregate.java | 6 ++---- .../streams/kstream/internals/KStreamWindowAggregate.java | 4 ++-- .../kafka/streams/kstream/internals/KTableAggregate.java | 4 ++-- .../apache/kafka/streams/kstream/internals/KTableFilter.java | 2 +- .../streams/kstream/internals/KTableKTableJoinMerger.java | 10 +++++++--- .../kafka/streams/kstream/internals/KTableMapValues.java | 2 +- .../apache/kafka/streams/kstream/internals/KTableReduce.java | 4 ++-- .../apache/kafka/streams/kstream/internals/KTableSource.java | 2 +- .../kafka/streams/kstream/internals/KTableTransformValues.java | 2 +- .../apache/kafka/streams/kstream/internals/TupleForwarder.java | 8 +------- 13 files changed, 27 insertions(+), 37 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java index f30ab79..4065ced 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java @@ -22,13 +22,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode; class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> { private final InternalProcessorContext context; - private final boolean sendOldValues; private final ProcessorNode myNode; - ForwardingCacheFlushListener(final ProcessorContext context, final boolean sendOldValues) { + ForwardingCacheFlushListener(final ProcessorContext context) { this.context = (InternalProcessorContext) context; myNode = this.context.currentNode(); - this.sendOldValues = sendOldValues; } @Override @@ -36,11 +34,7 @@ class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> { final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - if (sendOldValues) { - context.forward(key, new Change<>(newValue, oldValue)); - } else { - context.forward(key, new Change<>(newValue, null)); - } + context.forward(key, new Change<>(newValue, oldValue)); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 1b3a8f4..648c50b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -54,8 +54,8 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, private class KStreamAggregateProcessor extends AbstractProcessor<K, V> { private KeyValueStore<K, T> store; - private TupleForwarder<K, T> tupleForwarder; private StreamsMetricsImpl metrics; + private TupleForwarder<K, T> tupleForwarder; @SuppressWarnings("unchecked") @Override @@ -63,7 +63,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, super.init(context); metrics = (StreamsMetricsImpl) context.metrics(); store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @@ -92,7 +92,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index 9f404ea..09e4fab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -61,7 +61,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, metrics = (StreamsMetricsImpl) context.metrics(); store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @@ -89,7 +89,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index b89399b..13f4a6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -92,7 +92,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); store = (SessionStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } @Override @@ -135,7 +135,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<K>, Agg> session : merged) { store.remove(session.key); - tupleForwarder.maybeForward(session.key, null, session.value); + tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null); } } @@ -151,10 +151,8 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce lateRecordDropSensor.record(); } } - } - private SessionWindow mergeSessionWindow(final SessionWindow one, final SessionWindow two) { final long start = one.start() < two.start() ? one.start() : two.start(); final long end = one.end() > two.end() ? one.end() : two.end(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index f292515..0edbe4e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -87,7 +87,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); windowStore = (WindowStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context), sendOldValues); } @Override @@ -122,7 +122,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr // update the store with the new value windowStore.put(key, newAgg, windowStart); - tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg); + tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null); } else { log.debug( "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]", diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java index b60f9ab..b04a729 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java @@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<K, T>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } /** @@ -95,7 +95,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 75fba99..d1e524c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -70,7 +70,7 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index 2ed70bd..78c1dc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -89,7 +89,7 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); tupleForwarder = new TupleForwarder<>(store, context, - new ForwardingCacheFlushListener<K, V>(context, sendOldValues), + new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } @@ -98,9 +98,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { public void process(final K key, final Change<V> value) { if (queryableName != null) { store.put(key, value.newValue); - tupleForwarder.maybeForward(key, value.newValue, value.oldValue); + tupleForwarder.maybeForward(key, value.newValue, sendOldValues ? value.oldValue : null); } else { - context().forward(key, value); + if (sendOldValues) { + context().forward(key, value); + } else { + context().forward(key, new Change<>(value.newValue, null)); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java index c2c84d5..2317947 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java @@ -90,7 +90,7 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> { super.init(context); if (queryableName != null) { store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V1>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java index 069b360..38c5a11 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java @@ -57,7 +57,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { public void init(final ProcessorContext context) { super.init(context); store = (KeyValueStore<K, V>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } /** @@ -89,7 +89,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> { // update the store with the new value store.put(key, newAgg); - tupleForwarder.maybeForward(key, newAgg, oldAgg); + tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : null); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java index 274d96e..6fc57bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java @@ -77,7 +77,7 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> { metrics = (StreamsMetricsImpl) context.metrics(); if (queryableName != null) { store = (KeyValueStore<K, V>) context.getStateStore(queryableName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues); + tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index b3e84d7..88cea4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -91,7 +91,7 @@ class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V valueTransformer.init(new ForwardingDisabledProcessorContext(context)); if (queryableName != null) { - final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context, sendOldValues); + final ForwardingCacheFlushListener<K, V1> flushListener = new ForwardingCacheFlushListener<>(context); store = (KeyValueStore<K, V1>) context.getStateStore(queryableName); tupleForwarder = new TupleForwarder<>(store, context, flushListener, sendOldValues); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index ff3ef44..aec0d16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; class TupleForwarder<K, V> { private final CachedStateStore cachedStateStore; private final ProcessorContext context; - private final boolean sendOldValues; @SuppressWarnings("unchecked") TupleForwarder(final StateStore store, @@ -41,7 +40,6 @@ class TupleForwarder<K, V> { final boolean sendOldValues) { this.cachedStateStore = cachedStateStore(store); this.context = context; - this.sendOldValues = sendOldValues; if (this.cachedStateStore != null) { cachedStateStore.setFlushListener(flushListener, sendOldValues); } @@ -61,11 +59,7 @@ class TupleForwarder<K, V> { final V newValue, final V oldValue) { if (cachedStateStore == null) { - if (sendOldValues) { - context.forward(key, new Change<>(newValue, oldValue)); - } else { - context.forward(key, new Change<>(newValue, null)); - } + context.forward(key, new Change<>(newValue, oldValue)); } } }