[ 
https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634766#comment-16634766
 ] 

ASF GitHub Bot commented on KAFKA-7456:
---------------------------------------

mjsax closed pull request #5521: KAFKA-7456: Serde Inheritance in DSL
URL: https://github.com/apache/kafka/pull/5521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
index 6a851a10d9d..2474860a049 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java
@@ -24,7 +24,7 @@
     static public class TimeWindowedSerde<T> extends 
Serdes.WrapperSerde<Windowed<T>> {
         // Default constructor needed for reflection object creation
         public TimeWindowedSerde() {
-            super(new TimeWindowedSerializer<T>(), new 
TimeWindowedDeserializer<T>());
+            super(new TimeWindowedSerializer<>(), new 
TimeWindowedDeserializer<>());
         }
 
         public TimeWindowedSerde(final Serde<T> inner) {
@@ -35,7 +35,7 @@ public TimeWindowedSerde(final Serde<T> inner) {
     static public class SessionWindowedSerde<T> extends 
Serdes.WrapperSerde<Windowed<T>> {
         // Default constructor needed for reflection object creation
         public SessionWindowedSerde() {
-            super(new SessionWindowedSerializer<T>(), new 
SessionWindowedDeserializer<T>());
+            super(new SessionWindowedSerializer<>(), new 
SessionWindowedDeserializer<>());
         }
 
         public SessionWindowedSerde(final Serde<T> inner) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index a0724ebc6d6..e8707513a07 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
@@ -31,32 +32,48 @@
 import java.util.Objects;
 import java.util.Set;
 
-public abstract class AbstractStream<K> {
+/*
+ * Any classes (KTable, KStream, etc) extending this class should follow the 
serde specification precedence ordering as:
+ *
+ * 1) Overridden values via control objects (e.g. Materialized, Serialized, 
Consumed, etc)
+ * 2) Serdes that can be inferred from the operator itself (e.g. 
groupBy().count(), where value serde can default to `LongSerde`).
+ * 3) Serde inherited from parent operator if possible (note if the key / 
value types have been changed, then the corresponding serde cannot be 
inherited).
+ * 4) Default serde specified in the config.
+ */
+public abstract class AbstractStream<K, V> {
 
-    protected final InternalStreamsBuilder builder;
     protected final String name;
+    protected final Serde<K> keySerde;
+    protected final Serde<V> valSerde;
     protected final Set<String> sourceNodes;
     protected final StreamsGraphNode streamsGraphNode;
+    protected final InternalStreamsBuilder builder;
 
     // This copy-constructor will allow to extend KStream
     // and KTable APIs with new methods without impacting the public interface.
-    public AbstractStream(final AbstractStream<K> stream) {
-        this.builder = stream.builder;
+    public AbstractStream(final AbstractStream<K, V> stream) {
         this.name = stream.name;
+        this.builder = stream.builder;
+        this.keySerde = stream.keySerde;
+        this.valSerde = stream.valSerde;
         this.sourceNodes = stream.sourceNodes;
         this.streamsGraphNode = stream.streamsGraphNode;
     }
 
-    AbstractStream(final InternalStreamsBuilder builder,
-                   final String name,
+    AbstractStream(final String name,
+                   final Serde<K> keySerde,
+                   final Serde<V> valSerde,
                    final Set<String> sourceNodes,
-                   final StreamsGraphNode streamsGraphNode) {
+                   final StreamsGraphNode streamsGraphNode,
+                   final InternalStreamsBuilder builder) {
         if (sourceNodes == null || sourceNodes.isEmpty()) {
             throw new IllegalArgumentException("parameter <sourceNodes> must 
not be null or empty");
         }
 
-        this.builder = builder;
         this.name = name;
+        this.builder = builder;
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
         this.sourceNodes = sourceNodes;
         this.streamsGraphNode = streamsGraphNode;
     }
@@ -67,7 +84,7 @@ protected InternalTopologyBuilder internalTopologyBuilder() {
         return builder.internalTopologyBuilder;
     }
 
-    Set<String> ensureJoinableWith(final AbstractStream<K> other) {
+    Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) {
         final Set<String> allSourceNodes = new HashSet<>();
         allSourceNodes.addAll(sourceNodes);
         allSourceNodes.addAll(other.sourceNodes);
@@ -122,4 +139,13 @@ public void close() {
             }
         };
     }
+
+    // for testing only
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valSerde;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index a191c5ad19c..9791db6495d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -63,11 +63,12 @@
         this.streamsGraphNode = streamsGraphNode;
     }
 
-
-    <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> 
aggregateSupplier,
-                                final String functionName,
+    <KR, T> KTable<KR, T> build(final String functionName,
                                 final StoreBuilder<? extends StateStore> 
storeBuilder,
-                                final boolean isQueryable) {
+                                final KStreamAggProcessorSupplier<K, KR, V, T> 
aggregateSupplier,
+                                final boolean isQueryable,
+                                final Serde<KR> keySerde,
+                                final Serde<T> valSerde) {
 
         final String aggFunctionName = builder.newProcessorName(functionName);
 
@@ -95,13 +96,15 @@
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
 
-        return new KTableImpl<>(builder,
-                                aggFunctionName,
-                                aggregateSupplier,
+        return new KTableImpl<>(aggFunctionName,
+                                keySerde,
+                                valSerde,
                                 sourceName.equals(this.name) ? sourceNodes : 
Collections.singleton(sourceName),
                                 storeBuilder.name(),
                                 isQueryable,
-                                statefulProcessorNode);
+                                aggregateSupplier,
+                                statefulProcessorNode,
+                                builder);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 49f49d0ee32..8f767408fb4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -79,30 +79,33 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuil
     public <K, V> KStream<K, V> stream(final Collection<String> topics,
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
+        final StreamSourceNode<K, V> streamSourceNode = new 
StreamSourceNode<>(name, topics, consumed);
 
-        final StreamSourceNode<K, V> streamSourceNode = new 
StreamSourceNode<>(name,
-                                                                              
topics,
-                                                                              
consumed);
         addGraphNode(root, streamSourceNode);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name), 
false, streamSourceNode);
+        return new KStreamImpl<>(name,
+                                 consumed.keySerde(),
+                                 consumed.valueSerde(),
+                                 Collections.singleton(name),
+                                 false,
+                                 streamSourceNode,
+                                 this);
     }
 
     public <K, V> KStream<K, V> stream(final Pattern topicPattern,
                                        final ConsumedInternal<K, V> consumed) {
         final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
-
-        final StreamSourceNode<K, V> streamPatternSourceNode = new 
StreamSourceNode<>(name,
-                                                                               
       topicPattern,
-                                                                               
       consumed);
+        final StreamSourceNode<K, V> streamPatternSourceNode = new 
StreamSourceNode<>(name, topicPattern, consumed);
 
         addGraphNode(root, streamPatternSourceNode);
 
-        return new KStreamImpl<>(this,
-                                 name,
+        return new KStreamImpl<>(name,
+                                 consumed.keySerde(),
+                                 consumed.valueSerde(),
                                  Collections.singleton(name),
                                  false,
-                                 streamPatternSourceNode);
+                                 streamPatternSourceNode,
+                                 this);
     }
 
     @SuppressWarnings("unchecked")
@@ -129,15 +132,15 @@ public InternalStreamsBuilder(final 
InternalTopologyBuilder internalTopologyBuil
 
         addGraphNode(root, tableSourceNode);
 
-        return new KTableImpl<>(this,
-                                name,
-                                processorSupplier,
+        return new KTableImpl<>(name,
                                 consumed.keySerde(),
                                 consumed.valueSerde(),
                                 Collections.singleton(source),
                                 storeBuilder.name(),
                                 materialized.isQueryable(),
-                                tableSourceNode);
+                                processorSupplier,
+                                tableSourceNode,
+                                this);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 5d4f9f3dd3e..da2eeb6ab8f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -36,24 +36,22 @@
 import java.util.Objects;
 import java.util.Set;
 
-class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements 
KGroupedStream<K, V> {
+class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements 
KGroupedStream<K, V> {
 
     static final String REDUCE_NAME = "KSTREAM-REDUCE-";
     static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
 
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final boolean repartitionRequired;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
-    KGroupedStreamImpl(final InternalStreamsBuilder builder,
-                       final String name,
-                       final Set<String> sourceNodes,
+    KGroupedStreamImpl(final String name,
                        final Serde<K> keySerde,
                        final Serde<V> valSerde,
+                       final Set<String> sourceNodes,
                        final boolean repartitionRequired,
-                       final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                       final StreamsGraphNode streamsGraphNode,
+                       final InternalStreamsBuilder builder) {
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, 
builder);
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
             builder,
             keySerde,
@@ -63,8 +61,6 @@
             name,
             streamsGraphNode
         );
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
     }
 
@@ -189,14 +185,15 @@
         );
     }
 
-    private <KR, T> KTable<KR, T> doAggregate(final 
KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier,
-                                              final String functionName,
-                                              final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materializedInternal) {
+    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, 
K, V, T> aggregateSupplier,
+                                         final String functionName,
+                                         final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materializedInternal) {
         return aggregateBuilder.build(
-            aggregateSupplier,
             functionName,
             new 
KeyValueStoreMaterializer<>(materializedInternal).materialize(),
-            materializedInternal.isQueryable()
-        );
+            aggregateSupplier,
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde(),
+            materializedInternal.valueSerde());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 08fb605ef99..6ec3c0d1481 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -34,6 +34,7 @@
 
 import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * The implementation class of {@link KGroupedTable}.
@@ -41,14 +42,12 @@
  * @param <K> the key type
  * @param <V> the value type
  */
-public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements 
KGroupedTable<K, V> {
+public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements 
KGroupedTable<K, V> {
 
     private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final Serde<K> keySerde;
-    protected final Serde<V> valSerde;
     private final Initializer<Long> countInitializer = () -> 0L;
 
     private final Aggregator<K, V, Long> countAdder = (aggKey, value, 
aggregate) -> aggregate + 1L;
@@ -57,16 +56,13 @@
 
     KGroupedTableImpl(final InternalStreamsBuilder builder,
                       final String name,
-                      final String sourceName,
+                      final Set<String> sourceNodes,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde,
                       final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, Collections.singleton(sourceName), 
streamsGraphNode);
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, 
builder);
     }
 
-    @SuppressWarnings("unchecked")
     private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> 
aggregateSupplier,
                                          final String functionName,
                                          final MaterializedInternal<K, T, 
KeyValueStore<Bytes, byte[]>> materialized) {
@@ -75,9 +71,7 @@
         final String funcName = builder.newProcessorName(functionName);
         final String topic = materialized.storeName() + 
KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
-        final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName,
-                                                                       
sourceName,
-                                                                       topic);
+        final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName, sourceName, topic);
 
         // the passed in StreamsGraphNode must be the parent of the 
repartition node
         builder.addGraphNode(this.streamsGraphNode, repartitionNode);
@@ -90,34 +84,34 @@
         builder.addGraphNode(repartitionNode, statefulProcessorNode);
 
         // return the KTable representation with the intermediate topic as the 
sources
-        return new KTableImpl<>(builder,
-                                funcName,
-                                aggregateSupplier,
+        return new KTableImpl<>(funcName,
+                                materialized.keySerde(),
+                                materialized.valueSerde(),
                                 Collections.singleton(sourceName),
                                 materialized.storeName(),
                                 materialized.isQueryable(),
-                                statefulProcessorNode);
+                                aggregateSupplier,
+                                statefulProcessorNode,
+                                builder);
     }
 
-    @SuppressWarnings("unchecked")
     private <T> StatefulProcessorNode getStatefulProcessorNode(final 
MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
                                                                final String 
functionName,
-                                                               final 
ProcessorSupplier aggregateSupplier) {
+                                                               final 
ProcessorSupplier<K, Change<V>> aggregateSupplier) {
 
-        final ProcessorParameters aggregateFunctionProcessorParams = new 
ProcessorParameters<>(aggregateSupplier, functionName);
+        final ProcessorParameters<K, Change<V>> 
aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, 
functionName);
 
         return StatefulProcessorNode.statefulProcessorNodeBuilder()
             .withNodeName(functionName)
             .withProcessorParameters(aggregateFunctionProcessorParams)
-            .withStoreBuilder(new 
KeyValueStoreMaterializer(materialized).materialize()).build();
+            .withStoreBuilder(new 
KeyValueStoreMaterializer<>(materialized).materialize()).build();
     }
 
-    @SuppressWarnings("unchecked")
-    private GroupedTableOperationRepartitionNode createRepartitionNode(final 
String sinkName,
-                                                                       final 
String sourceName,
-                                                                       final 
String topic) {
+    private GroupedTableOperationRepartitionNode<K, V> 
createRepartitionNode(final String sinkName,
+                                                                             
final String sourceName,
+                                                                             
final String topic) {
 
-        return 
GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder()
+        return GroupedTableOperationRepartitionNode.<K, 
V>groupedTableOperationNodeBuilder()
             .withRepartitionTopic(topic)
             .withSinkName(sinkName)
             .withSourceName(sourceName)
@@ -151,7 +145,7 @@ private GroupedTableOperationRepartitionNode 
createRepartitionNode(final String
     @Override
     public KTable<K, V> reduce(final Reducer<V> adder,
                                final Reducer<V> subtractor) {
-        return reduce(adder, subtractor, Materialized.<K, V, 
KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde));
+        return reduce(adder, subtractor, Materialized.with(keySerde, 
valSerde));
     }
 
     @Override
@@ -176,7 +170,7 @@ private GroupedTableOperationRepartitionNode 
createRepartitionNode(final String
 
     @Override
     public KTable<K, Long> count() {
-        return count(Materialized.<K, Long, KeyValueStore<Bytes, 
byte[]>>with(keySerde, Serdes.Long()));
+        return count(Materialized.with(keySerde, Serdes.Long()));
     }
 
     @Override
@@ -206,7 +200,7 @@ private GroupedTableOperationRepartitionNode 
createRepartitionNode(final String
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, 
T> adder,
                                       final Aggregator<? super K, ? super V, 
T> subtractor) {
-        return aggregate(initializer, adder, subtractor, Materialized.<K, T, 
KeyValueStore<Bytes, byte[]>>with(keySerde, null));
+        return aggregate(initializer, adder, subtractor, 
Materialized.with(keySerde, null));
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 42c20a52f37..2a3bc8f9230 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -59,7 +59,7 @@
 import java.util.Objects;
 import java.util.Set;
 
-public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, 
V> {
+public class KStreamImpl<K, V> extends AbstractStream<K, V> implements 
KStream<K, V> {
 
     static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
@@ -113,12 +113,14 @@
 
     private final boolean repartitionRequired;
 
-    KStreamImpl(final InternalStreamsBuilder builder,
-                final String name,
+    KStreamImpl(final String name,
+                final Serde<K> keySerde,
+                final Serde<V> valueSerde,
                 final Set<String> sourceNodes,
                 final boolean repartitionRequired,
-                final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                final StreamsGraphNode streamsGraphNode,
+                final InternalStreamsBuilder builder) {
+        super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, 
builder);
         this.repartitionRequired = repartitionRequired;
     }
 
@@ -129,12 +131,16 @@
 
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name);
-        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = 
new ProcessorGraphNode<>(name,
-                                                                               
                       processorParameters,
-                                                                               
                       repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = 
new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
         builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, filterProcessorNode);
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 valSerde,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 filterProcessorNode,
+                                 builder);
     }
 
     @Override
@@ -143,46 +149,41 @@
         final String name = builder.newProcessorName(FILTER_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name);
-
-
-        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode 
= new ProcessorGraphNode<>(name,
-                                                                               
                          processorParameters,
-                                                                               
                          repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode 
= new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
         builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, filterNotProcessorNode);
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 valSerde,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 filterNotProcessorNode,
+                                 builder);
     }
 
     @Override
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends K1> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
-
         final ProcessorGraphNode<K, V> selectKeyProcessorNode = 
internalSelectKey(mapper);
 
         selectKeyProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
-        return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), 
sourceNodes, true, selectKeyProcessorNode);
+
+        // key serde cannot be preserved
+        return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, 
valSerde, sourceNodes, true, selectKeyProcessorNode, builder);
     }
 
 
     private <K1> ProcessorGraphNode<K, V> internalSelectKey(final 
KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
         final String name = builder.newProcessorName(KEY_SELECT_NAME);
 
-
-        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>(
-            (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new 
KeyValue<>(mapper.apply(key, value), value));
-
+        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, 
value) -> new KeyValue<>(mapper.apply(key, value), value));
 
         final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(kStreamMap, name);
 
-        return new ProcessorGraphNode<>(
-            name,
-            processorParameters,
-            repartitionRequired
-        );
-
+        return new ProcessorGraphNode<>(name, processorParameters, 
repartitionRequired);
     }
 
     @Override
@@ -192,14 +193,19 @@
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamMap<>(mapper), name);
 
-        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new 
ProcessorGraphNode<>(name,
-                                                                               
                    processorParameters,
-                                                                               
                    true);
+        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new 
ProcessorGraphNode<>(name, processorParameters, true);
 
         mapProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true, 
mapProcessorNode);
+        // key and value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 null,
+                                 null,
+                                 sourceNodes,
+                                 true,
+                                 mapProcessorNode,
+                                 builder);
     }
 
 
@@ -214,15 +220,19 @@
         final String name = builder.newProcessorName(MAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamMapValues<>(mapper), name);
+        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode 
= new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
-
-        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode 
= new ProcessorGraphNode<>(name,
-                                                                               
                          processorParameters,
-                                                                               
                          repartitionRequired);
         mapValuesProcessorNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, mapValuesProcessorNode);
+        // value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 keySerde,
+                                 null,
+                                 sourceNodes,
+                                 repartitionRequired,
+                                 mapValuesProcessorNode,
+                                 builder);
     }
 
     @Override
@@ -232,31 +242,30 @@ public void print(final Printed<K, V> printed) {
         final String name = builder.newProcessorName(PRINTING_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(printedInternal.build(this.name), name);
+        final ProcessorGraphNode<? super K, ? super V> printNode = new 
ProcessorGraphNode<>(name, processorParameters, false);
 
-
-        final ProcessorGraphNode<? super K, ? super V> printNode = new 
ProcessorGraphNode<>(name,
-                                                                               
             processorParameters,
-                                                                               
             false);
         builder.addGraphNode(this.streamsGraphNode, printNode);
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(
-        final KeyValueMapper<? super K, ? super V, ? extends Iterable<? 
extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? 
super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> 
mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(FLATMAP_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name);
-
-
-        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new 
ProcessorGraphNode<>(name,
-                                                                               
               processorParameters,
-                                                                               
               true);
+        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new 
ProcessorGraphNode<>(name, processorParameters, true);
         flatMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, true, 
flatMapNode);
+        // key and value serde cannot be preserved
+        return new KStreamImpl<>(name,
+                                 null,
+                                 null,
+                                 sourceNodes,
+                                 true,
+                                 flatMapNode,
+                                 builder);
     }
 
     @Override
@@ -270,15 +279,13 @@ public void print(final Printed<K, V> printed) {
         final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = 
new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name);
+        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new 
ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
 
-
-        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new 
ProcessorGraphNode<>(name,
-                                                                               
                     processorParameters,
-                                                                               
                     repartitionRequired);
         flatMapValuesNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, flatMapValuesNode);
+        // value serde cannot be preserved
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, 
this.repartitionRequired, flatMapValuesNode, builder);
     }
 
     @Override
@@ -298,25 +305,18 @@ public void print(final Printed<K, V> printed) {
             childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME);
         }
 
-
         final ProcessorParameters processorParameters = new 
ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), 
branchName);
-
-        final ProcessorGraphNode<K, V> branchNode = new 
ProcessorGraphNode<>(branchName,
-                                                                             
processorParameters,
-                                                                             
false);
+        final ProcessorGraphNode<K, V> branchNode = new 
ProcessorGraphNode<>(branchName, processorParameters, false);
         builder.addGraphNode(this.streamsGraphNode, branchNode);
 
         final KStream<K, V>[] branchChildren = (KStream<K, V>[]) 
Array.newInstance(KStream.class, predicates.length);
 
         for (int i = 0; i < predicates.length; i++) {
             final ProcessorParameters innerProcessorParameters = new 
ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]);
-
-            final ProcessorGraphNode<K, V> branchChildNode = new 
ProcessorGraphNode<>(childNames[i],
-                                                                               
       innerProcessorParameters,
-                                                                               
       repartitionRequired);
+            final ProcessorGraphNode<K, V> branchChildNode = new 
ProcessorGraphNode<>(childNames[i], innerProcessorParameters, 
repartitionRequired);
 
             builder.addGraphNode(branchNode, branchChildNode);
-            branchChildren[i] = new KStreamImpl<>(builder, childNames[i], 
sourceNodes, this.repartitionRequired, branchChildNode);
+            branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, 
valSerde, sourceNodes, repartitionRequired, branchChildNode, builder);
         }
 
         return branchChildren;
@@ -347,22 +347,9 @@ public void print(final Printed<K, V> printed) {
 
         mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, 
streamImpl.streamsGraphNode), mergeNode);
-        return new KStreamImpl<>(builder, name, allSourceNodes, 
requireRepartitioning, mergeNode);
-    }
 
-    @Override
-    public KStream<K, V> through(final String topic, final Produced<K, V> 
produced) {
-        final ProducedInternal<K, V> producedInternal = new 
ProducedInternal<>(produced);
-        to(topic, producedInternal);
-        return builder.stream(
-            Collections.singleton(topic),
-            new ConsumedInternal<>(
-                producedInternal.keySerde(),
-                producedInternal.valueSerde(),
-                new FailOnInvalidTimestamp(),
-                null
-            )
-        );
+        // drop the serde as we cannot safely use either one to represent both 
streams
+        return new KStreamImpl<>(name, null, null, allSourceNodes, 
requireRepartitioning, mergeNode, builder);
     }
 
     @Override
@@ -375,7 +362,6 @@ public void foreach(final ForeachAction<? super K, ? super 
V> action) {
             name
         );
 
-
         final ProcessorGraphNode<? super K, ? super V> foreachNode = new 
ProcessorGraphNode<>(name,
                                                                                
               processorParameters,
                                                                                
               repartitionRequired);
@@ -394,12 +380,11 @@ public void foreach(final ForeachAction<? super K, ? 
super V> action) {
 
         final ProcessorGraphNode<? super K, ? super V> peekNode = new 
ProcessorGraphNode<>(name,
                                                                                
            processorParameters,
-                                                                               
            repartitionRequired
-        );
+                                                                               
            repartitionRequired);
 
         builder.addGraphNode(this.streamsGraphNode, peekNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
repartitionRequired, peekNode);
+        return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, 
repartitionRequired, peekNode, builder);
     }
 
     @Override
@@ -407,6 +392,21 @@ public void foreach(final ForeachAction<? super K, ? super 
V> action) {
         return through(topic, Produced.with(null, null, null));
     }
 
+    @Override
+    public KStream<K, V> through(final String topic, final Produced<K, V> 
produced) {
+        final ProducedInternal<K, V> producedInternal = new 
ProducedInternal<>(produced);
+        to(topic, producedInternal);
+        return builder.stream(
+            Collections.singleton(topic),
+            new ConsumedInternal<>(
+                producedInternal.keySerde() != null ? 
producedInternal.keySerde() : keySerde,
+                producedInternal.valueSerde() != null ? 
producedInternal.valueSerde() : valSerde,
+                new FailOnInvalidTimestamp(),
+                null
+            )
+        );
+    }
+
     @Override
     public void to(final String topic) {
         to(topic, Produced.with(null, null, null));
@@ -431,7 +431,6 @@ public void to(final TopicNameExtractor<K, V> 
topicExtractor, final Produced<K,
         to(topicExtractor, new ProducedInternal<>(produced));
     }
 
-    @SuppressWarnings("unchecked")
     private void to(final TopicNameExtractor<K, V> topicExtractor, final 
ProducedInternal<K, V> produced) {
         final String name = builder.newProcessorName(SINK_NAME);
 
@@ -461,8 +460,8 @@ private void to(final TopicNameExtractor<K, V> 
topicExtractor, final ProducedInt
         transformNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
-
-        return new KStreamImpl<>(builder, name, sourceNodes, true, 
transformNode);
+        // cannot inherit key and value serde
+        return new KStreamImpl<>(name, null, null, sourceNodes, true, 
transformNode, builder);
     }
 
     @Override
@@ -496,7 +495,8 @@ private void to(final TopicNameExtractor<K, V> 
topicExtractor, final ProducedInt
         transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, 
this.repartitionRequired, transformNode);
+        // cannot inherit value serde
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, 
this.repartitionRequired, transformNode, builder);
     }
 
     @Override
@@ -565,11 +565,11 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(joined.keySerde(), 
joined.valueSerde());
+            joinThis = joinThis.repartitionForJoin(joined);
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(joined.keySerde(), 
joined.otherValueSerde());
+            joinOther = 
joinOther.repartitionForJoin(Joined.with(joined.keySerde(), 
joined.otherValueSerde(), joined.valueSerde()));
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -587,17 +587,16 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
      * Repartition a stream. This is required on join operations occurring 
after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
      *
-     * @param keySerde Serdes for serializing the keys
-     * @param valSerde Serdes for serializing the values
+     * @param joined joined control object
      * @return a new {@link KStreamImpl}
      */
-    private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde) {
-
+    private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) 
{
+        final Serde<K> repartitionKeySerde = joined.keySerde() != null ? 
joined.keySerde() : keySerde;
+        final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? 
joined.valueSerde() : valSerde;
         final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, 
V> optimizableRepartitionNodeBuilder = 
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
         final String repartitionedSourceName = 
createRepartitionedSource(builder,
-                                                                         
keySerde,
-                                                                         
valSerde,
+                                                                         
repartitionKeySerde,
+                                                                         
repartitionValueSerde,
                                                                          null,
                                                                          name,
                                                                          
optimizableRepartitionNodeBuilder);
@@ -605,7 +604,7 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, 
optimizableRepartitionNode);
 
-        return new KStreamImpl<>(builder, repartitionedSourceName, 
Collections.singleton(repartitionedSourceName), false, 
optimizableRepartitionNode);
+        return new KStreamImpl<>(repartitionedSourceName, repartitionKeySerde, 
repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, 
optimizableRepartitionNode, builder);
     }
 
     static <K1, V1> String createRepartitionedSource(final 
InternalStreamsBuilder builder,
@@ -678,18 +677,31 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(joined.keySerde(), joined.valueSerde());
-            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
false);
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(joined);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
joined, false);
         } else {
-            return doStreamTableJoin(other, joiner, false);
+            return doStreamTableJoin(other, joiner, joined, false);
         }
     }
 
     @Override
-    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> 
globalTable,
-                                              final KeyValueMapper<? super K, 
? super V, ? extends K1> keyMapper,
-                                              final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner) {
-        return globalTableJoin(globalTable, keyMapper, joiner, true);
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final 
ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+        return leftJoin(other, joiner, Joined.with(null, null, null));
+    }
+
+    @Override
+    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
+                                            final ValueJoiner<? super V, ? 
super VT, ? extends VR> joiner,
+                                            final Joined<K, V, VT> joined) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(joined, "joined can't be null");
+        if (repartitionRequired) {
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(joined);
+            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
joined, true);
+        } else {
+            return doStreamTableJoin(other, joiner, joined, true);
+        }
     }
 
     @Override
@@ -699,6 +711,13 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         return globalTableJoin(globalTable, keyMapper, joiner, false);
     }
 
+    @Override
+    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> 
globalTable,
+                                              final KeyValueMapper<? super K, 
? super V, ? extends K1> keyMapper,
+                                              final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner) {
+        return globalTableJoin(globalTable, keyMapper, joiner, true);
+    }
+
     private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, 
V1> globalTable,
                                                         final KeyValueMapper<? 
super K, ? super V, ? extends K1> keyMapper,
                                                         final ValueJoiner<? 
super V, ? super V1, ? extends V2> joiner,
@@ -724,17 +743,19 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
                                                                                
         null);
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, false, 
streamTableJoinNode);
+        // do not have serde for joined result
+        return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, 
streamTableJoinNode, builder);
     }
 
     @SuppressWarnings("unchecked")
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
                                                     final ValueJoiner<? super 
V, ? super V1, ? extends R> joiner,
+                                                    final Joined<K, V, V1> 
joined,
                                                     final boolean leftJoin) {
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = 
ensureJoinableWith((AbstractStream<K>) other);
+        final Set<String> allSourceNodes = 
ensureJoinableWith((AbstractStream<K, V1>) other);
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME 
: JOIN_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new 
KStreamKTableJoin<>(
@@ -743,7 +764,6 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
             leftJoin
         );
 
-
         final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
         final StreamTableJoinNode<K, V> streamTableJoinNode = new 
StreamTableJoinNode<>(
             name,
@@ -754,27 +774,8 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
         builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode);
 
-        return new KStreamImpl<>(builder, name, allSourceNodes, false, 
streamTableJoinNode);
-    }
-
-    @Override
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final 
ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return leftJoin(other, joiner, Joined.with(null, null, null));
-    }
-
-    @Override
-    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
-                                            final ValueJoiner<? super V, ? 
super VT, ? extends VR> joiner,
-                                            final Joined<K, V, VT> joined) {
-        Objects.requireNonNull(other, "other can't be null");
-        Objects.requireNonNull(joiner, "joiner can't be null");
-        Objects.requireNonNull(joined, "joined can't be null");
-        if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
-            return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
true);
-        } else {
-            return doStreamTableJoin(other, joiner, true);
-        }
+        // do not have serde for joined result
+        return new KStreamImpl<>(name, joined.keySerde() != null ? 
joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, 
builder);
     }
 
     @Override
@@ -792,16 +793,13 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         selectKeyMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);
-        return new KGroupedStreamImpl<>(
-            builder,
-            selectKeyMapNode.nodeName(),
-            sourceNodes,
-            serializedInternal.keySerde(),
-            serializedInternal.valueSerde(),
-            true,
-            selectKeyMapNode
-        );
-
+        return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(),
+                                        serializedInternal.keySerde(),
+                                        serializedInternal.valueSerde() != 
null ? serializedInternal.valueSerde() : valSerde,
+                                        sourceNodes,
+                                        true,
+                                        selectKeyMapNode,
+                                        builder);
     }
 
     @Override
@@ -812,14 +810,13 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
     @Override
     public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) {
         final SerializedInternal<K, V> serializedInternal = new 
SerializedInternal<>(serialized);
-        return new KGroupedStreamImpl<>(builder,
-                                        this.name,
+        return new KGroupedStreamImpl<>(this.name,
+                                        serializedInternal.keySerde() != null 
? serializedInternal.keySerde() : keySerde,
+                                        serializedInternal.valueSerde() != 
null ? serializedInternal.valueSerde() : valSerde,
                                         sourceNodes,
-                                        serializedInternal.keySerde(),
-                                        serializedInternal.valueSerde(),
                                         this.repartitionRequired,
-                                        streamsGraphNode);
-
+                                        streamsGraphNode,
+                                        builder);
     }
 
     @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
@@ -851,7 +848,6 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
             this.rightOuter = rightOuter;
         }
 
-        @SuppressWarnings("unchecked")
         public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super 
V1, ? super V2, ? extends R> joiner,
@@ -874,13 +870,13 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
             final KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindowStore.name());
 
-            final ProcessorParameters thisWindowStreamProcessorParams = new 
ProcessorParameters(thisWindowedStream, thisWindowStreamName);
+            final ProcessorParameters<K1, V1> thisWindowStreamProcessorParams 
= new ProcessorParameters<>(thisWindowedStream, thisWindowStreamName);
             final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new 
ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams);
             builder.addGraphNode(thisStreamsGraphNode, 
thisWindowedStreamsNode);
 
             final KStreamJoinWindow<K1, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindowStore.name());
 
-            final ProcessorParameters otherWindowStreamProcessorParams = new 
ProcessorParameters(otherWindowedStream, otherWindowStreamName);
+            final ProcessorParameters<K1, V2> otherWindowStreamProcessorParams 
= new ProcessorParameters<>(otherWindowedStream, otherWindowStreamName);
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
             builder.addGraphNode(otherStreamsGraphNode, 
otherWindowedStreamsNode);
 
@@ -902,11 +898,11 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
             final KStreamPassThrough<K1, R> joinMerge = new 
KStreamPassThrough<>();
 
-            final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, 
R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
+            final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, 
R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder();
 
-            final ProcessorParameters joinThisProcessorParams = new 
ProcessorParameters(joinThis, joinThisName);
-            final ProcessorParameters joinOtherProcessorParams = new 
ProcessorParameters(joinOther, joinOtherName);
-            final ProcessorParameters joinMergeProcessorParams = new 
ProcessorParameters(joinMerge, joinMergeName);
+            final ProcessorParameters<K1, V1> joinThisProcessorParams = new 
ProcessorParameters<>(joinThis, joinThisName);
+            final ProcessorParameters<K1, V2> joinOtherProcessorParams = new 
ProcessorParameters<>(joinOther, joinOtherName);
+            final ProcessorParameters<K1, R> joinMergeProcessorParams = new 
ProcessorParameters<>(joinMerge, joinMergeName);
 
             
joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams)
                        
.withJoinThisProcessorParameters(joinThisProcessorParams)
@@ -922,9 +918,12 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
 
             builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, 
otherStreamsGraphNode), joinGraphNode);
 
-            final Set<String> allSourceNodes = new 
HashSet<>(((AbstractStream<K>) lhs).sourceNodes);
+            final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K1, 
V1>) lhs).sourceNodes);
             allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
-            return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, 
false, joinGraphNode);
+
+            // do not have serde for joined result;
+            // also for key serde we do not inherit from either since we 
cannot tell if these two serdes are different
+            return new KStreamImpl<>(joinMergeName, joined.keySerde(), null, 
allSourceNodes, false, joinGraphNode, builder);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ea5c3049e7f..c5b29702c7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -57,7 +57,7 @@
  * @param <S> the source's (parent's) value type
  * @param <V> the value type
  */
-public class KTableImpl<K, S, V> extends AbstractStream<K> implements 
KTable<K, V> {
+public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements 
KTable<K, V> {
 
     static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
@@ -87,38 +87,19 @@
     private final boolean isQueryable;
 
     private boolean sendOldValues = false;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
 
-    public KTableImpl(final InternalStreamsBuilder builder,
-                      final String name,
-                      final ProcessorSupplier<?, ?> processorSupplier,
-                      final Set<String> sourceNodes,
-                      final String queryableStoreName,
-                      final boolean isQueryable,
-                      final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
-        this.processorSupplier = processorSupplier;
-        this.queryableStoreName = queryableStoreName;
-        this.keySerde = null;
-        this.valSerde = null;
-        this.isQueryable = isQueryable;
-    }
-
-    public KTableImpl(final InternalStreamsBuilder builder,
-                      final String name,
-                      final ProcessorSupplier<?, ?> processorSupplier,
+    public KTableImpl(final String name,
                       final Serde<K> keySerde,
                       final Serde<V> valSerde,
                       final Set<String> sourceNodes,
                       final String queryableStoreName,
                       final boolean isQueryable,
-                      final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+                      final ProcessorSupplier<?, ?> processorSupplier,
+                      final StreamsGraphNode streamsGraphNode,
+                      final InternalStreamsBuilder builder) {
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, 
builder);
         this.processorSupplier = processorSupplier;
         this.queryableStoreName = queryableStoreName;
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.isQueryable = isQueryable;
     }
 
@@ -159,18 +140,18 @@ public String queryableStoreName() {
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
-
-        return new KTableImpl<>(
-            builder,
-            name,
-            processorSupplier,
-            this.keySerde,
-            this.valSerde,
-            sourceNodes,
-            shouldMaterialize ? materializedInternal.storeName() : 
this.queryableStoreName,
-            shouldMaterialize,
-            tableNode
-        );
+        // we can inherit parent key and value serde if user do not provide 
specific overrides, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) 
parent
+        // we preserve the value following the order of 1) materialized, 2) 
parent
+        return new KTableImpl<>(name,
+                                materializedInternal != null && 
materializedInternal.keySerde() != null ? materializedInternal.keySerde() : 
keySerde,
+                                materializedInternal != null && 
materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : 
valSerde,
+                                sourceNodes,
+                                shouldMaterialize ? 
materializedInternal.storeName() : this.queryableStoreName,
+                                shouldMaterialize,
+                                processorSupplier,
+                                tableNode,
+                                builder);
     }
 
     @Override
@@ -234,14 +215,19 @@ public String queryableStoreName() {
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
+        // don't inherit parent value serde, since this operation may change 
the value type, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) 
parent, 3) null
+        // we preserve the value following the order of 1) materialized, 2) 
null
         return new KTableImpl<>(
-            builder,
             name,
-            processorSupplier,
+            materializedInternal != null && materializedInternal.keySerde() != 
null ? materializedInternal.keySerde() : keySerde,
+            materializedInternal != null ? materializedInternal.valueSerde() : 
null,
             sourceNodes,
             shouldMaterialize ? materializedInternal.storeName() : 
this.queryableStoreName,
             shouldMaterialize,
-            tableNode
+            processorSupplier,
+            tableNode,
+            builder
         );
     }
 
@@ -325,14 +311,19 @@ public String queryableStoreName() {
 
         builder.addGraphNode(this.streamsGraphNode, tableNode);
 
+        // don't inherit parent value serde, since this operation may change 
the value type, more specifically:
+        // we preserve the key following the order of 1) materialized, 2) 
parent, 3) null
+        // we preserve the value following the order of 1) materialized, 2) 
null
         return new KTableImpl<>(
-            builder,
             name,
-            processorSupplier,
+            materialized != null && materialized.keySerde() != null ? 
materialized.keySerde() : keySerde,
+            materialized != null ? materialized.valueSerde() : null,
             sourceNodes,
             shouldMaterialize ? materialized.storeName() : 
this.queryableStoreName,
             shouldMaterialize,
-            tableNode);
+            processorSupplier,
+            tableNode,
+            builder);
     }
 
     @Override
@@ -352,7 +343,8 @@ public String queryableStoreName() {
 
         builder.addGraphNode(this.streamsGraphNode, toStreamNode);
 
-        return new KStreamImpl<>(builder, name, sourceNodes, false, 
toStreamNode);
+        // we can inherit parent key and value serde
+        return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, false, 
toStreamNode, builder);
     }
 
     @Override
@@ -382,15 +374,15 @@ public String queryableStoreName() {
         builder.addGraphNode(streamsGraphNode, node);
 
         return new KTableImpl<K, S, V>(
-            builder,
             name,
-            suppressionSupplier,
             keySerde,
             valSerde,
             Collections.singleton(this.name),
             null,
             false,
-            node
+            suppressionSupplier,
+            node,
+            builder
         );
     }
 
@@ -475,7 +467,7 @@ public String queryableStoreName() {
         final String joinMergeName = builder.newProcessorName(MERGE_NAME);
 
         return buildJoin(
-            (AbstractStream<K>) other,
+            (AbstractStream<K, VO>) other,
             joiner,
             leftOuter,
             rightOuter,
@@ -485,8 +477,7 @@ public String queryableStoreName() {
         );
     }
 
-    @SuppressWarnings("unchecked")
-    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
+    private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other,
                                            final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner,
                                            final boolean leftOuter,
                                            final boolean rightOuter,
@@ -520,29 +511,9 @@ public String queryableStoreName() {
             joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) 
other, this, reverseJoiner(joiner));
         }
 
-        final KTableKTableJoinMerger<K, R> joinMerge = new 
KTableKTableJoinMerger<>(
-            new KTableImpl<K, V, R>(
-                builder,
-                joinThisName,
-                joinThis,
-                sourceNodes,
-                this.queryableStoreName,
-                false,
-                null
-            ),
-            new KTableImpl<K, V1, R>(
-                builder,
-                joinOtherName,
-                joinOther,
-                ((KTableImpl<K, ?, ?>) other).sourceNodes,
-                ((KTableImpl<K, ?, ?>) other).queryableStoreName,
-                false,
-                null
-            ),
-            internalQueryableName
-        );
+        final KTableKTableJoinMerger<K, R> joinMerge = new 
KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName);
 
-        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder 
kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
+        final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, Change<V>, 
Change<V1>, Change<R>> kTableJoinNodeBuilder = 
KTableKTableJoinNode.kTableKTableJoinNodeBuilder();
 
         // only materialize if specified in Materialized
         if (materializedInternal != null) {
@@ -550,9 +521,9 @@ public String queryableStoreName() {
         }
         kTableJoinNodeBuilder.withNodeName(joinMergeName);
 
-        final ProcessorParameters joinThisProcessorParameters = new 
ProcessorParameters(joinThis, joinThisName);
-        final ProcessorParameters joinOtherProcessorParameters = new 
ProcessorParameters(joinOther, joinOtherName);
-        final ProcessorParameters joinMergeProcessorParameters = new 
ProcessorParameters(joinMerge, joinMergeName);
+        final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = 
new ProcessorParameters<>(joinThis, joinThisName);
+        final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters 
= new ProcessorParameters<>(joinOther, joinOtherName);
+        final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = 
new ProcessorParameters<>(joinMerge, joinMergeName);
 
         
kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
                              
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
@@ -562,23 +533,26 @@ public String queryableStoreName() {
                              .withOtherJoinSideNodeName(((KTableImpl) 
other).name)
                              .withThisJoinSideNodeName(name);
 
-        final KTableKTableJoinNode kTableKTableJoinNode = 
kTableJoinNodeBuilder.build();
+        final KTableKTableJoinNode<K, Change<V>, Change<V1>, Change<R>> 
kTableKTableJoinNode = kTableJoinNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
 
-        return new KTableImpl<>(
-            builder,
+        // we can inherit parent key serde if user do not provide specific 
overrides
+        return new KTableImpl<K, Change<R>, R>(
             joinMergeName,
-            joinMerge,
+            materializedInternal != null && materializedInternal.keySerde() != 
null ? materializedInternal.keySerde() : keySerde,
+            materializedInternal != null ? materializedInternal.valueSerde() : 
null,
             allSourceNodes,
             internalQueryableName,
             internalQueryableName != null,
-            kTableKTableJoinNode
+            joinMerge,
+            kTableKTableJoinNode,
+            builder
         );
     }
 
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super 
K, ? super V, KeyValue<K1, V1>> selector) {
-        return this.groupBy(selector, Serialized.with(null, null));
+        return groupBy(selector, Serialized.with(null, null));
     }
 
     @Override
@@ -592,20 +566,20 @@ public String queryableStoreName() {
         final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require 
parent to send old values
-        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new 
ProcessorGraphNode<>(
-            selectName,
-            processorParameters,
-            false
-        );
+        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new 
ProcessorGraphNode<>(selectName, processorParameters, false);
 
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
         this.enableSendingOldValues();
+
         final SerializedInternal<K1, V1> serializedInternal = new 
SerializedInternal<>(serialized);
+
+        // we cannot inherit parent key and value serdes since both of them 
may have changed;
+        // we can only inherit from what serialized specified here
         return new KGroupedTableImpl<>(
             builder,
             selectName,
-            this.name,
+            sourceNodes,
             serializedInternal.keySerde(),
             serializedInternal.valueSerde(),
             groupByMapNode
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 5c464b9b79c..2ed70bd46a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -27,13 +27,13 @@
 
 class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> 
{
 
-    private final KTableImpl<K, ?, V> parent1;
-    private final KTableImpl<K, ?, V> parent2;
+    private final KTableProcessorSupplier<K, ?, V> parent1;
+    private final KTableProcessorSupplier<K, ?, V> parent2;
     private final String queryableName;
     private boolean sendOldValues = false;
 
-    KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1,
-                           final KTableImpl<K, ?, V> parent2,
+    KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, V> parent1,
+                           final KTableProcessorSupplier<K, ?, V> parent2,
                            final String queryableName) {
         this.parent1 = parent1;
         this.parent2 = parent2;
@@ -55,13 +55,13 @@
             return new KTableValueGetterSupplier<K, V>() {
 
                 public KTableValueGetter<K, V> get() {
-                    return parent1.valueGetterSupplier().get();
+                    return parent1.view().get();
                 }
 
                 @Override
                 public String[] storeNames() {
-                    final String[] storeNames1 = 
parent1.valueGetterSupplier().storeNames();
-                    final String[] storeNames2 = 
parent2.valueGetterSupplier().storeNames();
+                    final String[] storeNames1 = parent1.view().storeNames();
+                    final String[] storeNames2 = parent2.view().storeNames();
                     final Set<String> stores = new 
HashSet<>(storeNames1.length + storeNames2.length);
                     Collections.addAll(stores, storeNames1);
                     Collections.addAll(stores, storeNames2);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 98076e068ac..e185f4d926d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
@@ -40,10 +41,8 @@
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
 
-public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> 
implements SessionWindowedKStream<K, V> {
+public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> 
implements SessionWindowedKStream<K, V> {
     private final SessionWindows windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> 
aggOne + aggTwo;
 
@@ -55,11 +54,9 @@
                                final Serde<V> valSerde,
                                final GroupedStreamAggregateBuilder<K, V> 
aggregateBuilder,
                                final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, 
builder);
         Objects.requireNonNull(windows, "windows can't be null");
         this.windows = windows;
-        this.keySerde = keySerde;
-        this.valSerde = valSerde;
         this.aggregateBuilder = aggregateBuilder;
     }
 
@@ -92,16 +89,17 @@
         }
 
         return aggregateBuilder.build(
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
-                windows, materializedInternal.storeName(),
+                windows,
+                materializedInternal.storeName(),
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
-                countMerger
-            ),
-            AGGREGATE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+                countMerger),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -125,6 +123,8 @@
         }
 
         return aggregateBuilder.build(
+            REDUCE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -132,10 +132,9 @@
                 reduceAggregator,
                 mergerForAggregator(reduceAggregator)
             ),
-            REDUCE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -160,18 +159,19 @@
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
+
         return aggregateBuilder.build(
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
                 initializer,
                 aggregator,
-                sessionMerger
-            ),
-            AGGREGATE_NAME,
-            materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+                sessionMerger),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @SuppressWarnings("deprecation") // continuing to support 
SessionWindows#maintainMs in fallback mode
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 5c5cfb2bed7..2ee8f7c5958 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -40,11 +41,9 @@
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
 
-public class TimeWindowedKStreamImpl<K, V, W extends Window> extends 
AbstractStream<K> implements TimeWindowedKStream<K, V> {
+public class TimeWindowedKStreamImpl<K, V, W extends Window> extends 
AbstractStream<K, V> implements TimeWindowedKStream<K, V> {
 
     private final Windows<W> windows;
-    private final Serde<K> keySerde;
-    private final Serde<V> valSerde;
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     TimeWindowedKStreamImpl(final Windows<W> windows,
@@ -55,9 +54,7 @@
                             final Serde<V> valSerde,
                             final boolean repartitionRequired,
                             final StreamsGraphNode streamsGraphNode) {
-        super(builder, name, sourceNodes, streamsGraphNode);
-        this.valSerde = valSerde;
-        this.keySerde = keySerde;
+        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, 
builder);
         this.windows = Objects.requireNonNull(windows, "windows can't be 
null");
         this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, 
keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode);
     }
@@ -92,19 +89,14 @@
         }
 
         return aggregateBuilder.build(
-            new KStreamWindowAggregate<>(
-                windows,
-                materializedInternal.storeName(),
-                aggregateBuilder.countInitializer,
-                aggregateBuilder.countAggregator
-            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
-
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> 
initializer,
                                                   final Aggregator<? super K, 
? super V, VR> aggregator) {
@@ -124,15 +116,12 @@
             materializedInternal.withKeySerde(keySerde);
         }
         return aggregateBuilder.build(
-            new KStreamWindowAggregate<>(
-                windows,
-                materializedInternal.storeName(),
-                initializer,
-                aggregator
-            ),
             AGGREGATE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable());
+            new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), initializer, aggregator),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -156,11 +145,12 @@
         }
 
         return aggregateBuilder.build(
-            new KStreamWindowReduce<>(windows, 
materializedInternal.storeName(), reducer),
             REDUCE_NAME,
             materialize(materializedInternal),
-            materializedInternal.isQueryable()
-        );
+            new KStreamWindowReduce<>(windows, 
materializedInternal.storeName(), reducer),
+            materializedInternal.isQueryable(),
+            materializedInternal.keySerde() != null ? new 
WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.valueSerde());
     }
 
     @SuppressWarnings("deprecation") // continuing to support 
Windows#maintainMs/segmentInterval in fallback mode
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
index 97fb69d7c2a..4d1b67dbc33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java
@@ -102,11 +102,10 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
 
     }
 
-    public static GroupedTableOperationRepartitionNodeBuilder 
groupedTableOperationNodeBuilder() {
-        return new GroupedTableOperationRepartitionNodeBuilder();
+    public static <K1, V1> GroupedTableOperationRepartitionNodeBuilder<K1, V1> 
groupedTableOperationNodeBuilder() {
+        return new GroupedTableOperationRepartitionNodeBuilder<>();
     }
 
-
     public static final class GroupedTableOperationRepartitionNodeBuilder<K, 
V> {
 
         private Serde<K> keySerde;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index b63b66de38a..41c27bac5f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -99,7 +99,7 @@ public String toString() {
                "} " + super.toString();
     }
 
-    public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, 
VR> kTableKTableJoinNodeBuilder() {
+    public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> 
kTableKTableJoinNodeBuilder() {
         return new KTableKTableJoinNodeBuilder<>();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index eb3d9f691fd..4b767b41493 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@
  */
 public class ProcessorParameters<K, V> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    private final ProcessorSupplier<? super K, ? super V> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> 
processorSupplier,
                                final String processorName) {
 
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<? super K, ? super V> processorSupplier() {
         return processorSupplier;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
index 4360d0846ec..ff180d173b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java
@@ -17,15 +17,37 @@
 package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class WindowedSerdesTest {
 
     private final String topic = "sample";
 
+    @Test
+    public void shouldWrapForTimeWindowedSerde() {
+        final Serde<Windowed<String>> serde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+        assertTrue(serde.serializer() instanceof TimeWindowedSerializer);
+        assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer);
+        assertTrue(((TimeWindowedSerializer) 
serde.serializer()).innerSerializer() instanceof StringSerializer);
+        assertTrue(((TimeWindowedDeserializer) 
serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+    }
+
+    @Test
+    public void shouldWrapForSessionWindowedSerde() {
+        final Serde<Windowed<String>> serde = 
WindowedSerdes.sessionWindowedSerdeFrom(String.class);
+        assertTrue(serde.serializer() instanceof SessionWindowedSerializer);
+        assertTrue(serde.deserializer() instanceof 
SessionWindowedDeserializer);
+        assertTrue(((SessionWindowedSerializer) 
serde.serializer()).innerSerializer() instanceof StringSerializer);
+        assertTrue(((SessionWindowedDeserializer) 
serde.deserializer()).innerDeserializer() instanceof StringDeserializer);
+    }
+
     @Test
     public void testTimeWindowSerdeFrom() {
         final Windowed<Integer> timeWindowed = new Windowed<>(10, new 
TimeWindow(0, Long.MAX_VALUE));
@@ -43,5 +65,4 @@ public void testSessionWindowedSerdeFrom() {
         final Windowed<Integer> windowed = 
sessionWindowedSerde.deserializer().deserialize(topic, bytes);
         Assert.assertEquals(sessionWindowed, windowed);
     }
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index d98fd796aef..b360ceced03 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -96,7 +96,7 @@ public void testShouldBeExtensible() {
         assertTrue(supplier.theCapturedProcessor().processed.size() <= 
expectedKeys.length);
     }
 
-    private class ExtendedKStream<K, V> extends AbstractStream<K> {
+    private class ExtendedKStream<K, V> extends AbstractStream<K, V> {
 
         ExtendedKStream(final KStream<K, V> stream) {
             super((KStreamImpl<K, V>) stream);
@@ -104,11 +104,12 @@ public void testShouldBeExtensible() {
 
         KStream<K, V> randomFilter() {
             final String name = builder.newProcessorName("RANDOM-FILTER-");
-            final ProcessorGraphNode processorNode = new 
ProcessorGraphNode(name,
-                                                                            
new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
-                                                                            
false);
+            final ProcessorGraphNode<K, V> processorNode = new 
ProcessorGraphNode<>(
+                name,
+                new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
+                false);
             builder.addGraphNode(this.streamsGraphNode, processorNode);
-            return new KStreamImpl<>(builder, name, sourceNodes, false, 
processorNode);
+            return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, 
processorNode, builder);
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index bce7fc80a40..119a7b72d98 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -32,12 +33,17 @@
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -76,6 +82,8 @@
     private final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
+    private Serde<String> mySerde = new Serdes.StringSerde();
+
     @Before
     public void before() {
         builder = new StreamsBuilder();
@@ -180,6 +188,121 @@ public Integer apply(final Integer value1, final Integer 
value2) {
             
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
     }
 
+    @Test
+    public void shouldPreserveSerdesForOperators() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, String> stream1 = 
builder.stream(Collections.singleton("topic-1"), stringConsumed);
+        final KTable<String, String> table1 = builder.table("topic-2", 
stringConsumed);
+        final GlobalKTable<String, String> table2 = 
builder.globalTable("topic-2", stringConsumed);
+        final ConsumedInternal<String, String> consumedInternal = new 
ConsumedInternal<>(stringConsumed);
+
+        final KeyValueMapper<String, String, String> selector = (key, value) 
-> key;
+        final KeyValueMapper<String, String, Iterable<KeyValue<String, 
String>>> flatSelector = (key, value) -> Collections.singleton(new 
KeyValue<>(key, value));
+        final ValueMapper<String, String> mapper = value -> value;
+        final ValueMapper<String, Iterable<String>> flatMapper = 
Collections::singleton;
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> 
value1;
+        final TransformerSupplier<String, String, KeyValue<String, String>> 
transformerSupplier = () -> new Transformer<String, String, KeyValue<String, 
String>>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public KeyValue<String, String> transform(final String key, final 
String value) {
+                return new KeyValue<>(key, value);
+            }
+
+            @Override
+            public void close() {}
+        };
+        final ValueTransformerSupplier<String, String> 
valueTransformerSupplier = () -> new ValueTransformer<String, String>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public String transform(final String value) {
+                return value;
+            }
+
+            @Override
+            public void close() {}
+        };
+
+        assertEquals(((AbstractStream) stream1.filter((key, value) -> 
false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.filter((key, value) -> 
false)).valueSerde(), consumedInternal.valueSerde());
+
+        assertEquals(((AbstractStream) stream1.filterNot((key, value) -> 
false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.filterNot((key, value) -> 
false)).valueSerde(), consumedInternal.valueSerde());
+
+        assertNull(((AbstractStream) stream1.selectKey(selector)).keySerde());
+        assertEquals(((AbstractStream) 
stream1.selectKey(selector)).valueSerde(), consumedInternal.valueSerde());
+
+        assertNull(((AbstractStream) stream1.map(KeyValue::new)).keySerde());
+        assertNull(((AbstractStream) stream1.map(KeyValue::new)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.mapValues(mapper)).keySerde(), 
consumedInternal.keySerde());
+        assertNull(((AbstractStream) stream1.mapValues(mapper)).valueSerde());
+
+        assertNull(((AbstractStream) 
stream1.flatMap(flatSelector)).keySerde());
+        assertNull(((AbstractStream) 
stream1.flatMap(flatSelector)).valueSerde());
+
+        assertEquals(((AbstractStream) 
stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde());
+        assertNull(((AbstractStream) 
stream1.flatMapValues(flatMapper)).valueSerde());
+
+        assertNull(((AbstractStream) 
stream1.transform(transformerSupplier)).keySerde());
+        assertNull(((AbstractStream) 
stream1.transform(transformerSupplier)).valueSerde());
+
+        assertEquals(((AbstractStream) 
stream1.transformValues(valueTransformerSupplier)).keySerde(), 
consumedInternal.keySerde());
+        assertNull(((AbstractStream) 
stream1.transformValues(valueTransformerSupplier)).valueSerde());
+
+        assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
+        assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), 
consumedInternal.keySerde());
+        assertEquals(((AbstractStream) 
stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.through("topic-3", 
Produced.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.through("topic-3", 
Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), 
consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), 
consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) 
stream1.groupByKey(Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) 
stream1.groupByKey(Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), 
null);
+        assertEquals(((AbstractStream) 
stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) stream1.groupBy(selector, 
Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupBy(selector, 
Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, 
JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, 
JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), 
mySerde);
+        assertNull(((AbstractStream) stream1.join(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, 
JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, 
JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), 
mySerde);
+        assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, 
JoinWindows.of(100L))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, 
JoinWindows.of(100L))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), 
mySerde);
+        assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, 
JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+
+        assertEquals(((AbstractStream) stream1.join(table1, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.join(table1, 
joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.join(table1, joiner, 
Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.join(table1, joiner, 
Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, 
joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, 
Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, 
Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.join(table2, selector, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.join(table2, selector, 
joiner)).valueSerde(), null);
+
+        assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, 
joiner)).valueSerde(), null);
+    }
+
     @Test
     public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 2692c17f021..10a27b9f2bb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -43,20 +42,11 @@
     @Test
     public void testMap() {
         final StreamsBuilder builder = new StreamsBuilder();
-
-        final KeyValueMapper<Integer, String, KeyValue<String, Integer>> 
mapper =
-            new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() {
-                @Override
-                public KeyValue<String, Integer> apply(final Integer key, 
final String value) {
-                    return KeyValue.pair(value, key);
-                }
-            };
-
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
         final MockProcessorSupplier<String, Integer> supplier = new 
MockProcessorSupplier<>();
         final KStream<Integer, String> stream = builder.stream(topicName, 
Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.map(mapper).process(supplier);
+        stream.map((key, value) -> KeyValue.pair(value, 
key)).process(supplier);
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             for (final int expectedKey : expectedKeys) {
@@ -75,16 +65,9 @@ public void testMap() {
 
     @Test
     public void testTypeVariance() {
-        final KeyValueMapper<Number, Object, KeyValue<Number, String>> 
stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() {
-            @Override
-            public KeyValue<Number, String> apply(final Number key, final 
Object value) {
-                return KeyValue.pair(key, key + ":" + value);
-            }
-        };
-
         new StreamsBuilder()
             .<Integer, String>stream("numbers")
-            .map(stringify)
+            .map((key, value) -> KeyValue.pair(key, key + ":" + value))
             .to("strings");
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index eb586e60de1..6e666c94691 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
@@ -28,13 +30,17 @@
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
@@ -62,6 +68,7 @@
 
 public class KTableImplTest {
 
+    private final Consumed<String, String> stringConsumed = 
Consumed.with(Serdes.String(), Serdes.String());
     private final Consumed<String, String> consumed = 
Consumed.with(Serdes.String(), Serdes.String());
     private final Produced<String, String> produced = 
Produced.with(Serdes.String(), Serdes.String());
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@@ -70,6 +77,8 @@
     private StreamsBuilder builder;
     private KTable<String, String> table;
 
+    private Serde<String> mySerde = new Serdes.StringSerde();
+
     @Before
     public void setUp() {
         builder = new StreamsBuilder();
@@ -125,6 +134,74 @@ public boolean test(final String key, final Integer value) 
{
         assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), 
processors.get(3).processed);
     }
 
+    @Test
+    public void shouldPreserveSerdesForOperators() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, String> table1 = builder.table("topic-2", 
stringConsumed);
+        final ConsumedInternal<String, String> consumedInternal = new 
ConsumedInternal<>(stringConsumed);
+
+        final KeyValueMapper<String, String, String> selector = (key, value) 
-> key;
+        final ValueMapper<String, String> mapper = value -> value;
+        final ValueJoiner<String, String, String> joiner = (value1, value2) -> 
value1;
+        final ValueTransformerWithKeySupplier<String, String, String> 
valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, 
String, String>() {
+            @Override
+            public void init(final ProcessorContext context) {}
+
+            @Override
+            public String transform(final String key, final String value) {
+                return value;
+            }
+
+            @Override
+            public void close() {}
+        };
+
+        assertEquals(((AbstractStream) table1.filter((key, value) -> 
false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.filter((key, value) -> 
false)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.filter((key, value) -> false, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> 
false)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> 
false)).valueSerde(), consumedInternal.valueSerde());
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(), 
consumedInternal.keySerde());
+        assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
+        assertEquals(((AbstractStream) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.toStream()).keySerde(), 
consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.toStream()).valueSerde(), 
consumedInternal.valueSerde());
+        assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
+        assertEquals(((AbstractStream) 
table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde());
+
+        assertEquals(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), 
consumedInternal.keySerde());
+        assertNull(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
+        assertEquals(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) 
table1.groupBy(KeyValue::new)).keySerde(), null);
+        assertEquals(((AbstractStream) 
table1.groupBy(KeyValue::new)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, 
Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, 
Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.join(table1, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.join(table1, 
joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.leftJoin(table1, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.leftJoin(table1, 
joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+
+        assertEquals(((AbstractStream) table1.outerJoin(table1, 
joiner)).keySerde(), consumedInternal.keySerde());
+        assertEquals(((AbstractStream) table1.outerJoin(table1, 
joiner)).valueSerde(), null);
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+    }
+
     @Test
     public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index d65f27e2b19..75e9f5120b0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -38,9 +38,7 @@
 
 public class StreamsGraphTest {
 
-    final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
-    
-
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
 
     // Test builds topology in succesive manner but only graph node not yet 
processed written to topology
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Serde Inheritance in Streams DSL
> --------------------------------
>
>                 Key: KAFKA-7456
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7456
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>             Fix For: 2.1.0
>
>
> This is a prerequisite for further topology optimization in the Streams DSL: 
> we should let different operators inside the DSL to be able to pass along key 
> and value serdes if they are not explicitly specified by users. The serde 
> specification precedence should generally be:
> 1) Overridden values via control objects (e.g. Materialized, Serialized, 
> Consumed, etc)
> 2) Serdes that can be inferred from the operator itself (e.g. 
> groupBy().count(), where value serde can default to `LongSerde`).
> 3) Serde inherited from parent operator if possible (note if the key / value 
> types have been changed, then the corresponding serde cannot be inherited).
> 4) Default serde specified in the config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to