This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 5d17ae1  KAFKA-6398: Return value getter based on KTable 
materialization status
5d17ae1 is described below

commit 5d17ae182806dd2d15c532c31f7127673eeaa9a2
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu Jan 18 13:12:44 2018 -0800

    KAFKA-6398: Return value getter based on KTable materialization status
    
    This is a bug fix that is composed of two parts:
    
    1. The major part is, for all operators that is generating a KTable, we 
should construct its value getter based on whether the KTable itself is 
materialized.
    1.a If yes, then query the materialized store directly for value getter.
    1.b If not, then hand over to its parents value getter (recursively) and 
apply the computation to return.
    
    2. The minor part is, in KStreamImpl, when joining with a table, we should 
connect with table's `valueGetterSupplier().storeNames()`, not the 
`internalStoreName()` as the latter always assume that the KTable is 
materialized, but that is not always true.
    
    Author: Guozhang Wang <wangg...@gmail.com>
    
    Reviewers: Damian Guy <damian....@gmail.com>, Bill Bejeck 
<b...@confluent.io>, Matthias J. Sax <matth...@confluent.io>
    
    Closes #4421 from guozhangwang/K6398-KTableValueGetter
    
    (cherry picked from commit 75e37d7e2063b726aa0185f4b695f32cde69d95c)
    Signed-off-by: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/kstream/internals/KStreamImpl.java     |  2 +-
 .../streams/kstream/internals/KTableAggregate.java | 37 ++-------
 .../streams/kstream/internals/KTableFilter.java    | 55 +++++++-------
 .../streams/kstream/internals/KTableImpl.java      |  8 +-
 .../internals/KTableKTableAbstractJoin.java        | 13 ++--
 ...ableKTableAbstractJoinValueGetterSupplier.java} | 10 +--
 .../kstream/internals/KTableKTableJoin.java        |  6 +-
 .../kstream/internals/KTableKTableJoinMerger.java  | 47 +++++++++---
 .../kstream/internals/KTableKTableLeftJoin.java    |  6 +-
 .../kstream/internals/KTableKTableOuterJoin.java   |  6 +-
 .../kstream/internals/KTableKTableRightJoin.java   |  6 +-
 .../streams/kstream/internals/KTableMapValues.java | 46 ++++++------
 .../KTableMaterializedValueGetterSupplier.java     | 54 ++++++++++++++
 .../streams/kstream/internals/KTableReduce.java    | 34 +--------
 .../kstream/internals/KTableRepartitionMap.java    |  8 +-
 .../processor/internals/ProcessorTopology.java     | 10 +++
 .../apache/kafka/streams/StreamsBuilderTest.java   | 87 +++++++++++++++++++++-
 .../kstream/internals/KTableFilterTest.java        |  3 +
 .../kstream/internals/KTableKTableJoinTest.java    |  2 -
 19 files changed, 281 insertions(+), 159 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8e80315..0e39a30 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -769,7 +769,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME 
: JOIN_NAME);
         builder.internalTopologyBuilder.addProcessor(name, new 
KStreamKTableJoin<>(((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), 
joiner, leftJoin), this.name);
-        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, 
((KTableImpl<K, ?, V1>) other).internalStoreName());
+        builder.internalTopologyBuilder.connectProcessorAndStateStores(name, 
((KTableImpl) other).valueGetterSupplier().storeNames());
         builder.internalTopologyBuilder.connectProcessors(this.name, 
((KTableImpl<K, ?, V1>) other).name);
 
         return new KStreamImpl<>(builder, name, allSourceNodes, false);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index 973de0f..10765ff 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -33,7 +33,10 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
 
     private boolean sendOldValues = false;
 
-    public KTableAggregate(String storeName, Initializer<T> initializer, 
Aggregator<? super K, ? super V, T> add, Aggregator<? super K, ? super V, T> 
remove) {
+    KTableAggregate(final String storeName,
+                    final Initializer<T> initializer,
+                    final Aggregator<? super K, ? super V, T> add,
+                    final Aggregator<? super K, ? super V, T> remove) {
         this.storeName = storeName;
         this.initializer = initializer;
         this.add = add;
@@ -51,7 +54,6 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
     }
 
     private class KTableAggregateProcessor extends AbstractProcessor<K, 
Change<V>> {
-
         private KeyValueStore<K, T> store;
         private TupleForwarder<K, T> tupleForwarder;
 
@@ -98,35 +100,6 @@ public class KTableAggregate<K, V, T> implements 
KTableProcessorSupplier<K, V, T
 
     @Override
     public KTableValueGetterSupplier<K, T> view() {
-
-        return new KTableValueGetterSupplier<K, T>() {
-
-            public KTableValueGetter<K, T> get() {
-                return new KTableAggregateValueGetter();
-            }
-
-            @Override
-            public String[] storeNames() {
-                return new String[]{storeName};
-            }
-        };
-    }
-
-    private class KTableAggregateValueGetter implements KTableValueGetter<K, 
T> {
-
-        private KeyValueStore<K, T> store;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, T>) context.getStateStore(storeName);
-        }
-
-        @Override
-        public T get(K key) {
-            return store.get(key);
-        }
-
+        return new KTableMaterializedValueGetterSupplier<>(storeName);
     }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index af8c906..ee8982b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -30,8 +30,10 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableFilter(final KTableImpl<K, ?, V> parent, final Predicate<? 
super K, ? super V> predicate,
-                        final boolean filterNot, final String queryableName) {
+    KTableFilter(final KTableImpl<K, ?, V> parent,
+                 final Predicate<? super K, ? super V> predicate,
+                 final boolean filterNot,
+                 final String queryableName) {
         this.parent = parent;
         this.predicate = predicate;
         this.filterNot = filterNot;
@@ -44,24 +46,6 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     }
 
     @Override
-    public KTableValueGetterSupplier<K, V> view() {
-
-        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = 
parent.valueGetterSupplier();
-
-        return new KTableValueGetterSupplier<K, V>() {
-
-            public KTableValueGetter<K, V> get() {
-                return new 
KTableFilterValueGetter(parentValueGetterSupplier.get());
-            }
-
-            @Override
-            public String[] storeNames() {
-                return parentValueGetterSupplier.storeNames();
-            }
-        };
-    }
-
-    @Override
     public void enableSendingOldValues() {
         parent.enableSendingOldValues();
         sendOldValues = true;
@@ -108,24 +92,45 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
     }
 
-    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        // if the KTable is materialized, use the materialized store to return 
getter value;
+        // otherwise rely on the parent getter and apply filter on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V>() {
+                final KTableValueGetterSupplier<K, V> 
parentValueGetterSupplier = parent.valueGetterSupplier();
+
+                public KTableValueGetter<K, V> get() {
+                    return new 
KTableFilterValueGetter(parentValueGetterSupplier.get());
+                }
+
+                @Override
+                public String[] storeNames() {
+                    return parentValueGetterSupplier.storeNames();
+                }
+            };
+        }
+    }
 
+    private class KTableFilterValueGetter implements KTableValueGetter<K, V> {
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableFilterValueGetter(KTableValueGetter<K, V> parentGetter) {
+        KTableFilterValueGetter(final KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public V get(K key) {
+        public V get(final K key) {
             return computeValue(key, parentGetter.get(key));
         }
-
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3bc6f4b..e465302 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -137,10 +137,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         return this.queryableStoreName;
     }
 
-    String internalStoreName() {
-        return this.queryableStoreName;
-    }
-
     @SuppressWarnings("deprecation")
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> 
predicate,
                                   final 
org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> 
storeSupplier,
@@ -760,9 +756,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         }
 
         final KTableKTableJoinMerger<K, R> joinMerge = new 
KTableKTableJoinMerger<>(
-                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, 
sourceNodes, this.internalStoreName(), false),
+                new KTableImpl<K, V, R>(builder, joinThisName, joinThis, 
sourceNodes, this.queryableStoreName, false),
                 new KTableImpl<K, V1, R>(builder, joinOtherName, joinOther, 
((KTableImpl<K, ?, ?>) other).sourceNodes,
-                        ((KTableImpl<K, ?, ?>) other).internalStoreName(), 
false),
+                        ((KTableImpl<K, ?, ?>) other).queryableStoreName, 
false),
                 internalQueryableName
         );
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
index 7fa39d9..bdc1dca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
@@ -20,13 +20,13 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 
 abstract class KTableKTableAbstractJoin<K, R, V1, V2> implements 
KTableProcessorSupplier<K, V1, R> {
 
-    protected final KTableImpl<K, ?, V1> table1;
-    protected final KTableImpl<K, ?, V2> table2;
-    protected final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    protected final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
-    protected final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
+    private final KTableImpl<K, ?, V1> table1;
+    private final KTableImpl<K, ?, V2> table2;
+    final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
 
-    protected boolean sendOldValues = false;
+    boolean sendOldValues = false;
 
     KTableKTableAbstractJoin(KTableImpl<K, ?, V1> table1,
                              KTableImpl<K, ?, V2> table2,
@@ -44,5 +44,4 @@ abstract class KTableKTableAbstractJoin<K, R, V1, V2> 
implements KTableProcessor
         table2.enableSendingOldValues();
         sendOldValues = true;
     }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
similarity index 78%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
rename to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
index 74c0632..be5a202 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKTableKTableJoinValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
@@ -18,12 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
 
 import java.util.ArrayList;
 
-public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, 
V2> implements KTableValueGetterSupplier<K, R> {
-    final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
-    final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
+public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, 
V2> implements KTableValueGetterSupplier<K, R> {
+    final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
+    final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
 
-    public AbstractKTableKTableJoinValueGetterSupplier(final 
KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
-                                                       final 
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+    KTableKTableAbstractJoinValueGetterSupplier(final 
KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
+                                                final 
KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
         this.valueGetterSupplier1 = valueGetterSupplier1;
         this.valueGetterSupplier2 = valueGetterSupplier2;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 1b26a5b..c424f4f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -42,12 +42,12 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableAbstractJoinValueGetterSupplier extends 
org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K,
 R, V1, V2> {
 
-        public 
KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public 
KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 82d9c26..d27b8bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -28,9 +28,9 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
-                                  final KTableImpl<K, ?, V> parent2,
-                                  final String queryableName) {
+    KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
+                           final KTableImpl<K, ?, V> parent2,
+                           final String queryableName) {
         this.parent1 = parent1;
         this.parent2 = parent2;
         this.queryableName = queryableName;
@@ -38,12 +38,42 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
     @Override
     public Processor<K, Change<V>> get() {
-        return new KTableKTableJoinMergeProcessor<>();
+        return new KTableKTableJoinMergeProcessor();
     }
 
     @Override
     public KTableValueGetterSupplier<K, V> view() {
-        return parent1.valueGetterSupplier();
+        // if the result KTable is materialized, use the materialized store to 
return getter value;
+        // otherwise rely on the parent getter and apply join on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V>() {
+
+                public KTableValueGetter<K, V> get() {
+                    return parent1.valueGetterSupplier().get();
+                }
+
+                @Override
+                public String[] storeNames() {
+                    // we need to allow the downstream processor to be able to 
access both ends of the joining table's value getters
+                    final String[] storeNames1 = 
parent1.valueGetterSupplier().storeNames();
+                    final String[] storeNames2 = 
parent2.valueGetterSupplier().storeNames();
+
+                    final String[] stores = new String[storeNames1.length + 
storeNames2.length];
+                    int i = 0;
+                    for (final String storeName : storeNames1) {
+                        stores[i] = storeName;
+                        i++;
+                    }
+                    for (final String storeName : storeNames2) {
+                        stores[i] = storeName;
+                        i++;
+                    }
+                    return stores;
+                }
+            };
+        }
     }
 
     @Override
@@ -53,14 +83,13 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         sendOldValues = true;
     }
 
-    private class KTableKTableJoinMergeProcessor<K, V>
-        extends AbstractProcessor<K, Change<V>> {
+    private class KTableKTableJoinMergeProcessor extends AbstractProcessor<K, 
Change<V>> {
         private KeyValueStore<K, V> store;
         private TupleForwarder<K, V> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V>) 
context.getStateStore(queryableName);
@@ -72,7 +101,6 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
         @Override
         public void process(K key, Change<V> value) {
-
             if (queryableName != null) {
                 store.put(key, value.newValue);
                 tupleForwarder.maybeForward(key, value.newValue, 
value.oldValue);
@@ -81,5 +109,4 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
             }
         }
     }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index c308a0d..33aef02 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableLeftJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableLeftAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public 
KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 9cee4f3..d2e1d79 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableOuterJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableOuterAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public 
KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index b43efaa..f4c840b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableRightJoinValueGetterSupplier extends 
AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableRightAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        public 
KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 41dd7cd..9dfbd1f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -30,8 +30,9 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    public KTableMapValues(final KTableImpl<K, ?, V> parent, final 
ValueMapper<? super V, ? extends V1> mapper,
-                           final String queryableName) {
+    KTableMapValues(final KTableImpl<K, ?, V> parent,
+                    final ValueMapper<? super V, ? extends V1> mapper,
+                    final String queryableName) {
         this.parent = parent;
         this.mapper = mapper;
         this.queryableName = queryableName;
@@ -44,19 +45,24 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
 
     @Override
     public KTableValueGetterSupplier<K, V1> view() {
-        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = 
parent.valueGetterSupplier();
-
-        return new KTableValueGetterSupplier<K, V1>() {
-
-            public KTableValueGetter<K, V1> get() {
-                return new 
KTableMapValuesValueGetter(parentValueGetterSupplier.get());
-            }
-
-            @Override
-            public String[] storeNames() {
-                return parentValueGetterSupplier.storeNames();
-            }
-        };
+        // if the KTable is materialized, use the materialized store to return 
getter value;
+        // otherwise rely on the parent getter and apply map-values on-the-fly
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        } else {
+            return new KTableValueGetterSupplier<K, V1>() {
+                final KTableValueGetterSupplier<K, V> 
parentValueGetterSupplier = parent.valueGetterSupplier();
+
+                public KTableValueGetter<K, V1> get() {
+                    return new 
KTableMapValuesValueGetter(parentValueGetterSupplier.get());
+                }
+
+                @Override
+                public String[] storeNames() {
+                    return parentValueGetterSupplier.storeNames();
+                }
+            };
+        }
     }
 
     @Override
@@ -75,13 +81,12 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
     }
 
     private class KTableMapValuesProcessor extends AbstractProcessor<K, 
Change<V>> {
-
         private KeyValueStore<K, V1> store;
         private TupleForwarder<K, V1> tupleForwarder;
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             if (queryableName != null) {
                 store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
@@ -107,20 +112,19 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
 
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValuesValueGetter(KTableValueGetter<K, V> 
parentGetter) {
+        KTableMapValuesValueGetter(KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public V1 get(K key) {
+        public V1 get(final K key) {
             return computeValue(parentGetter.get(key));
         }
-
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
new file mode 100644
index 0000000..4ceccce
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableMaterializedValueGetterSupplier<K, V> implements 
KTableValueGetterSupplier<K, V> {
+
+    private final String storeName;
+
+    KTableMaterializedValueGetterSupplier(final String storeName) {
+        this.storeName = storeName;
+    }
+
+    public KTableValueGetter<K, V> get() {
+        return new KTableMaterializedValueGetter();
+    }
+
+    @Override
+    public String[] storeNames() {
+        return new String[]{storeName};
+    }
+
+    private class KTableMaterializedValueGetter implements 
KTableValueGetter<K, V> {
+        private KeyValueStore<K, V> store;
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+        }
+
+        @Override
+        public V get(final K key) {
+            return store.get(key);
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 744cc9c..b595bc3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -31,7 +31,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
     private boolean sendOldValues = false;
 
-    public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> 
removeReducer) {
+    KTableReduce(final String storeName, final Reducer<V> addReducer, final 
Reducer<V> removeReducer) {
         this.storeName = storeName;
         this.addReducer = addReducer;
         this.removeReducer = removeReducer;
@@ -54,7 +54,7 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
         @SuppressWarnings("unchecked")
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             super.init(context);
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<K, V>(store, context, new 
ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
@@ -94,34 +94,6 @@ public class KTableReduce<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
     @Override
     public KTableValueGetterSupplier<K, V> view() {
-
-        return new KTableValueGetterSupplier<K, V>() {
-
-            public KTableValueGetter<K, V> get() {
-                return new KTableAggregateValueGetter();
-            }
-
-            @Override
-            public String[] storeNames() {
-                return new String[]{storeName};
-            }
-        };
-    }
-
-    private class KTableAggregateValueGetter implements KTableValueGetter<K, 
V> {
-
-        private KeyValueStore<K, V> store;
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void init(ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
-        }
-
-        @Override
-        public V get(K key) {
-            return store.get(key);
-        }
-
+        return new KTableMaterializedValueGetterSupplier<K, V>(storeName);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 09714a7..5aa3f2f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -33,7 +33,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements 
KTableProcessorSuppli
     private final KTableImpl<K, ?, V> parent;
     private final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> 
mapper;
 
-    public KTableRepartitionMap(KTableImpl<K, ?, V> parent, KeyValueMapper<? 
super K, ? super V, KeyValue<K1, V1>> mapper) {
+    KTableRepartitionMap(final KTableImpl<K, ?, V> parent, final 
KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> mapper) {
         this.parent = parent;
         this.mapper = mapper;
     }
@@ -101,17 +101,17 @@ public class KTableRepartitionMap<K, V, K1, V1> 
implements KTableProcessorSuppli
 
         private final KTableValueGetter<K, V> parentGetter;
 
-        public KTableMapValueGetter(KTableValueGetter<K, V> parentGetter) {
+        KTableMapValueGetter(final KTableValueGetter<K, V> parentGetter) {
             this.parentGetter = parentGetter;
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(final ProcessorContext context) {
             parentGetter.init(context);
         }
 
         @Override
-        public KeyValue<K1, V1> get(K key) {
+        public KeyValue<K1, V1> get(final K key) {
             return mapper.apply(key, parentGetter.get(key));
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
index 8a29786..8fbeb25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
@@ -132,4 +132,14 @@ public class ProcessorTopology {
         return sb.toString();
     }
 
+    // for testing only
+    public Set<String> processorConnectedStateStores(final String 
processorName) {
+        for (final ProcessorNode<?, ?> node : processorNodes) {
+            if (node.name().equals(processorName)) {
+                return node.stateStores;
+            }
+        }
+
+        return Collections.emptySet();
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 13b5b45..4a496b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
+import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockMapper;
@@ -45,6 +46,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -69,6 +71,27 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(filteredKTable, 
MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty(),
 is(true));
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedFilteredKTable() {
+        final KTable<Bytes, String> filteredKTable = builder.<Bytes, 
String>table("table-topic")
+                .filter(MockPredicate.<Bytes, String>allGoodPredicate(), 
Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        builder.<Bytes, String>stream("stream-topic").join(filteredKTable, 
MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton("store")));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), 
equalTo(Collections.singleton("store")));
     }
 
     @Test
@@ -77,14 +100,72 @@ public class StreamsBuilderTest {
         builder.<Bytes, String>stream("stream-topic").join(mappedKTable, 
MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty(),
 is(true));
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedMapValuedKTable() {
+        final KTable<Bytes, String> mappedKTable = builder.<Bytes, 
String>table("table-topic")
+                .mapValues(MockMapper.<String>noOpValueMapper(), 
Materialized.<Bytes, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        builder.<Bytes, String>stream("stream-topic").join(mappedKTable, 
MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), 
equalTo(Collections.singleton("store")));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"),
 equalTo(Collections.singleton("store")));
+    }
+
+    @Test
+    public void shouldAllowJoinUnmaterializedJoinedKTable() {
+        final KTable<Bytes, String> table1 = builder.table("table-topic1");
+        final KTable<Bytes, String> table2 = builder.table("table-topic2");
+        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, 
MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(2));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), 
equalTo(Utils.mkSet(topology.stateStores().get(0).name(), 
topology.stateStores().get(1).name())));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty(),
 is(true));
+
+    }
+
+    @Test
+    public void shouldAllowJoinMaterializedJoinedKTable() {
+        final KTable<Bytes, String> table1 = builder.table("table-topic1");
+        final KTable<Bytes, String> table2 = builder.table("table-topic2");
+        builder.<Bytes, String>stream("stream-topic").join(table1.join(table2, 
MockValueJoiner.TOSTRING_JOINER, Materialized.<Bytes, String, 
KeyValueStore<Bytes, byte[]>>as("store")), MockValueJoiner.TOSTRING_JOINER);
+
+        driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(3));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), 
equalTo(Collections.singleton("store")));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), 
equalTo(Collections.singleton("store")));
     }
 
     @Test
     public void shouldAllowJoinMaterializedSourceKTable() {
-        final KTable<Bytes, String> table = builder.<Bytes, 
String>table("table-topic");
+        final KTable<Bytes, String> table = builder.table("table-topic");
         builder.<Bytes, String>stream("stream-topic").join(table, 
MockValueJoiner.TOSTRING_JOINER);
 
         driver.setUp(builder, TestUtils.tempDirectory());
+
+        ProcessorTopology topology = builder.internalTopologyBuilder.build();
+
+        assertThat(topology.stateStores().size(), equalTo(1));
+        
assertThat(topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
+        
assertThat(topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), 
equalTo(Collections.singleton(topology.stateStores().get(0).name())));
     }
 
     @Test
@@ -168,7 +249,7 @@ public class StreamsBuilderTest {
         driver.process(topic, 1L, "value1");
         driver.process(topic, 2L, "value2");
         driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore) 
driver.allStateStores().get("store");
+        final KeyValueStore<Long, String> store = (KeyValueStore<Long, 
String>) driver.allStateStores().get("store");
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
         assertThat(results.get(1L), equalTo("value1"));
@@ -186,7 +267,7 @@ public class StreamsBuilderTest {
         driver.process(topic, 1L, "value1");
         driver.process(topic, 2L, "value2");
         driver.flushState();
-        final KeyValueStore<Long, String> store = (KeyValueStore) 
driver.allStateStores().get("store");
+        final KeyValueStore<Long, String> store = (KeyValueStore<Long, 
String>) driver.allStateStores().get("store");
         assertThat(store.get(1L), equalTo("value1"));
         assertThat(store.get(2L), equalTo("value2"));
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index d70d8b7..01236ad 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -177,6 +177,7 @@ public class KTableFilterTest {
 
         driver.process(topic1, "A", 2);
         driver.process(topic1, "B", 2);
+        driver.flushState();
 
         assertEquals(2, (int) getter2.get("A"));
         assertEquals(2, (int) getter2.get("B"));
@@ -187,6 +188,7 @@ public class KTableFilterTest {
         assertEquals(1, (int) getter3.get("C"));
 
         driver.process(topic1, "A", 3);
+        driver.flushState();
 
         assertNull(getter2.get("A"));
         assertEquals(2, (int) getter2.get("B"));
@@ -198,6 +200,7 @@ public class KTableFilterTest {
 
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
+        driver.flushState();
 
         assertNull(getter2.get("A"));
         assertNull(getter2.get("B"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index aeb2418..09d4aa0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -46,8 +46,6 @@ public class KTableKTableJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
-    final private String storeName1 = "store-name-1";
-    final private String storeName2 = "store-name-2";
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Reply via email to