This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab156fd  KAFKA-6036: Follow-up; cleanup sendOldValues logic 
ForwardingCacheFlushListener  (#6017)
ab156fd is described below

commit ab156fded1548b6cf06aff56bb5193c37a8f7e51
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Sun Dec 9 15:33:17 2018 -0800

    KAFKA-6036: Follow-up; cleanup sendOldValues logic 
ForwardingCacheFlushListener  (#6017)
    
    This is a follow-up PR from the previous PR #5779, where KTabeSource always 
get old values from the store even if sendOldValues. It gets me to make a pass 
over all the KTable/KStreamXXX processor to push the sendOldValues at the 
callers in order to avoid unnecessary store reads.
    
    More details: ForwardingCacheFlushListener and TupleForwarder both need 
sendOldValues as parameters.
    a. For ForwardingCacheFlushListener it is not needed at all, since its 
callers XXXCachedStore already use the sendOldValues values passed from 
TupleForwarder to avoid getting old values from underlying stores.
    b. For TupleForwarder, it actually only need to pass the boolean flag to 
the cached store; and then it does not need to keep it as its own variable 
since the cached store already respects the boolean to pass null or the actual 
value..
    
    The only other minor bug I found from the pass in on KTableJoinMerge, where 
we always pass old values and ignores sendOldValues.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>
---
 .../kstream/internals/ForwardingCacheFlushListener.java        | 10 ++--------
 .../kafka/streams/kstream/internals/KStreamAggregate.java      |  6 +++---
 .../apache/kafka/streams/kstream/internals/KStreamReduce.java  |  4 ++--
 .../kstream/internals/KStreamSessionWindowAggregate.java       |  6 ++----
 .../streams/kstream/internals/KStreamWindowAggregate.java      |  4 ++--
 .../kafka/streams/kstream/internals/KTableAggregate.java       |  4 ++--
 .../apache/kafka/streams/kstream/internals/KTableFilter.java   |  2 +-
 .../streams/kstream/internals/KTableKTableJoinMerger.java      | 10 +++++++---
 .../kafka/streams/kstream/internals/KTableMapValues.java       |  2 +-
 .../apache/kafka/streams/kstream/internals/KTableReduce.java   |  4 ++--
 .../apache/kafka/streams/kstream/internals/KTableSource.java   |  2 +-
 .../kafka/streams/kstream/internals/KTableTransformValues.java |  2 +-
 .../apache/kafka/streams/kstream/internals/TupleForwarder.java |  8 +-------
 13 files changed, 27 insertions(+), 37 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index f30ab79..4065ced 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -22,13 +22,11 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
     private final InternalProcessorContext context;
-    private final boolean sendOldValues;
     private final ProcessorNode myNode;
 
-    ForwardingCacheFlushListener(final ProcessorContext context, final boolean 
sendOldValues) {
+    ForwardingCacheFlushListener(final ProcessorContext context) {
         this.context = (InternalProcessorContext) context;
         myNode = this.context.currentNode();
-        this.sendOldValues = sendOldValues;
     }
 
     @Override
@@ -36,11 +34,7 @@ class ForwardingCacheFlushListener<K, V> implements 
CacheFlushListener<K, V> {
         final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1b3a8f4..648c50b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -54,8 +54,8 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
     private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
 
         private KeyValueStore<K, T> store;
-        private TupleForwarder<K, T> tupleForwarder;
         private StreamsMetricsImpl metrics;
+        private TupleForwarder<K, T> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -63,7 +63,7 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
             super.init(context);
             metrics = (StreamsMetricsImpl) context.metrics();
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
 
@@ -92,7 +92,7 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 9f404ea..09e4fab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -61,7 +61,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
             metrics = (StreamsMetricsImpl) context.metrics();
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
 
@@ -89,7 +89,7 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index b89399b..13f4a6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -92,7 +92,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
             lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
 
             store = (SessionStore<K, Agg>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         @Override
@@ -135,7 +135,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<K>, Agg> session : merged) {
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, 
session.value);
+                        tupleForwarder.maybeForward(session.key, null, 
sendOldValues ? session.value : null);
                     }
                 }
 
@@ -151,10 +151,8 @@ public class KStreamSessionWindowAggregate<K, V, Agg> 
implements KStreamAggProce
                 lateRecordDropSensor.record();
             }
         }
-
     }
 
-
     private SessionWindow mergeSessionWindow(final SessionWindow one, final 
SessionWindow two) {
         final long start = one.start() < two.start() ? one.start() : 
two.start();
         final long end = one.end() > two.end() ? one.end() : two.end();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f292515..0edbe4e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -87,7 +87,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
             lateRecordDropSensor = 
Sensors.lateRecordDropSensor(internalProcessorContext);
 
             windowStore = (WindowStore<K, Agg>) 
context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(windowStore, context, new 
ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), 
sendOldValues);
+            tupleForwarder = new TupleForwarder<>(windowStore, context, new 
ForwardingCacheFlushListener<Windowed<K>, V>(context), sendOldValues);
         }
 
         @Override
@@ -122,7 +122,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends 
Window> implements KStr
 
                     // update the store with the new value
                     windowStore.put(key, newAgg, windowStart);
-                    tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, oldAgg);
+                    tupleForwarder.maybeForward(new Windowed<>(key, 
entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
                 } else {
                     log.debug(
                         "Skipping record for expired window. key=[{}] 
topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) 
expiration=[{}]",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index b60f9ab..b04a729 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         /**
@@ -95,7 +95,7 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
 
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 75fba99..d1e524c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -70,7 +70,7 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 2ed70bd..78c1dc6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -89,7 +89,7 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context,
-                    new ForwardingCacheFlushListener<K, V>(context, 
sendOldValues),
+                    new ForwardingCacheFlushListener<K, V>(context),
                     sendOldValues);
             }
         }
@@ -98,9 +98,13 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         public void process(final K key, final Change<V> value) {
             if (queryableName != null) {
                 store.put(key, value.newValue);
-                tupleForwarder.maybeForward(key, value.newValue, 
value.oldValue);
+                tupleForwarder.maybeForward(key, value.newValue, sendOldValues 
? value.oldValue : null);
             } else {
-                context().forward(key, value);
+                if (sendOldValues) {
+                    context().forward(key, value);
+                } else {
+                    context().forward(key, new Change<>(value.newValue, null));
+                }
             }
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index c2c84d5..2317947 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -90,7 +90,7 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V1>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V1>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 069b360..38c5a11 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -57,7 +57,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+            tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
         }
 
         /**
@@ -89,7 +89,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
             // update the store with the new value
             store.put(key, newAgg);
-            tupleForwarder.maybeForward(key, newAgg, oldAgg);
+            tupleForwarder.maybeForward(key, newAgg, sendOldValues ? oldAgg : 
null);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 274d96e..6fc57bc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -77,7 +77,7 @@ public class KTableSource<K, V> implements 
ProcessorSupplier<K, V> {
             metrics = (StreamsMetricsImpl) context.metrics();
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
-                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
+                tupleForwarder = new TupleForwarder<>(store, context, new 
ForwardingCacheFlushListener<K, V>(context), sendOldValues);
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index b3e84d7..88cea4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -91,7 +91,7 @@ class KTableTransformValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V
             valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
 
             if (queryableName != null) {
-                final ForwardingCacheFlushListener<K, V1> flushListener = new 
ForwardingCacheFlushListener<>(context, sendOldValues);
+                final ForwardingCacheFlushListener<K, V1> flushListener = new 
ForwardingCacheFlushListener<>(context);
                 store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
                 tupleForwarder = new TupleForwarder<>(store, context, 
flushListener, sendOldValues);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
index ff3ef44..aec0d16 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java
@@ -32,7 +32,6 @@ import 
org.apache.kafka.streams.state.internals.WrappedStateStore;
 class TupleForwarder<K, V> {
     private final CachedStateStore cachedStateStore;
     private final ProcessorContext context;
-    private final boolean sendOldValues;
 
     @SuppressWarnings("unchecked")
     TupleForwarder(final StateStore store,
@@ -41,7 +40,6 @@ class TupleForwarder<K, V> {
                    final boolean sendOldValues) {
         this.cachedStateStore = cachedStateStore(store);
         this.context = context;
-        this.sendOldValues = sendOldValues;
         if (this.cachedStateStore != null) {
             cachedStateStore.setFlushListener(flushListener, sendOldValues);
         }
@@ -61,11 +59,7 @@ class TupleForwarder<K, V> {
                              final V newValue,
                              final V oldValue) {
         if (cachedStateStore == null) {
-            if (sendOldValues) {
-                context.forward(key, new Change<>(newValue, oldValue));
-            } else {
-                context.forward(key, new Change<>(newValue, null));
-            }
+            context.forward(key, new Change<>(newValue, oldValue));
         }
     }
 }

Reply via email to