This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new fa3fe3e  MINOR: Fix generic type of ProcessorParameters (#5741)
fa3fe3e is described below

commit fa3fe3e68059068ac894552078155b16c21404aa
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Thu Oct 4 19:37:53 2018 -0500

    MINOR: Fix generic type of ProcessorParameters (#5741)
    
    In unrelated recent work, I noticed some warnings about the missing type 
parameters on ProcessorParameters.
    
    While investigating it, it seems like there was a bug in the creation of 
repartition topics.
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../internals/GroupedStreamAggregateBuilder.java   |   7 +-
 .../kstream/internals/KGroupedTableImpl.java       |  22 +--
 .../streams/kstream/internals/KStreamImpl.java     | 184 +++++++++++----------
 .../internals/graph/ProcessorParameters.java       |  12 +-
 .../internals/graph/StatefulProcessorNode.java     |   6 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 6 files changed, 125 insertions(+), 114 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 3439cf5..8e6f990 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder<K, V> {
             builder.addGraphNode(parentNode, repartitionNode);
             parentNode = repartitionNode;
         }
-        final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+        final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> 
statefulProcessorNodeBuilder = 
StatefulProcessorNode.statefulProcessorNodeBuilder();
+
+        final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
 
-        final ProcessorParameters processorParameters = new 
ProcessorParameters<>(aggregateSupplier, aggFunctionName);
         statefulProcessorNodeBuilder
             .withProcessorParameters(processorParameters)
             .withNodeName(aggFunctionName)
             .withRepartitionRequired(repartitionRequired)
             .withStoreBuilder(storeBuilder);
 
-        final StatefulProcessorNode<K, T> statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
+        final StatefulProcessorNode<K, V> statefulProcessorNode = 
statefulProcessorNodeBuilder.build();
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index c97576b..013028d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    protected final String userSpecifiedName;
+    private final String userSpecifiedName;
 
     private final Initializer<Long> countInitializer = () -> 0L;
 
@@ -72,7 +72,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         final String sourceName = 
builder.newProcessorName(KStreamImpl.SOURCE_NAME);
         final String funcName = builder.newProcessorName(functionName);
         final String repartitionTopic = (userSpecifiedName != null ? 
userSpecifiedName : materialized.storeName())
-                + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
+            + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         final StreamsGraphNode repartitionNode = 
createRepartitionNode(sinkName, sourceName, repartitionTopic);
 
@@ -98,16 +98,18 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
                                 builder);
     }
 
-    private <T> StatefulProcessorNode getStatefulProcessorNode(final 
MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized,
-                                                               final String 
functionName,
-                                                               final 
ProcessorSupplier<K, Change<V>> aggregateSupplier) {
+    private <T> StatefulProcessorNode<K, Change<V>> 
getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, 
byte[]>> materialized,
+                                                                             
final String functionName,
+                                                                             
final ProcessorSupplier<K, Change<V>> aggregateSupplier) {
 
         final ProcessorParameters<K, Change<V>> 
aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, 
functionName);
 
-        return StatefulProcessorNode.statefulProcessorNodeBuilder()
+        return StatefulProcessorNode
+            .<K, Change<V>>statefulProcessorNodeBuilder()
             .withNodeName(functionName)
             .withProcessorParameters(aggregateFunctionProcessorParams)
-            .withStoreBuilder(new 
KeyValueStoreMaterializer<>(materialized).materialize()).build();
+            .withStoreBuilder(new 
KeyValueStoreMaterializer<>(materialized).materialize())
+            .build();
     }
 
     private GroupedTableOperationRepartitionNode<K, V> 
createRepartitionNode(final String sinkName,
@@ -164,9 +166,9 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K, V> implements KGr
         }
 
         final ProcessorSupplier<K, Change<V>> aggregateSupplier = new 
KTableAggregate<>(materializedInternal.storeName(),
-                countInitializer,
-                countAdder,
-                countSubtractor);
+                                                                               
         countInitializer,
+                                                                               
         countAdder,
+                                                                               
         countSubtractor);
 
         return doAggregate(aggregateSupplier, AGGREGATE_NAME, 
materializedInternal);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 96fa8b9..49dbbd1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
@@ -165,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends K1> mapper) {
+    public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? 
super V, ? extends KR> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
 
         final ProcessorGraphNode<K, V> selectKeyProcessorNode = 
internalSelectKey(mapper);
@@ -178,10 +179,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
 
-    private <K1> ProcessorGraphNode<K, V> internalSelectKey(final 
KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
+    private <KR> ProcessorGraphNode<K, V> internalSelectKey(final 
KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
         final String name = builder.newProcessorName(KEY_SELECT_NAME);
 
-        final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, 
value) -> new KeyValue<>(mapper.apply(key, value), value));
+        final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, 
value) -> new KeyValue<>(mapper.apply(key, value), value));
 
         final ProcessorParameters<K, V> processorParameters = new 
ProcessorParameters<>(kStreamMap, name);
 
@@ -189,7 +190,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
+    public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? 
super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(MAP_NAME);
 
@@ -212,7 +213,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
 
 
     @Override
-    public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? 
extends V1> mapper) {
+    public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? 
extends VR> mapper) {
         return mapValues(withKey(mapper));
     }
 
@@ -250,7 +251,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? 
super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> 
mapper) {
+    public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? 
super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> 
mapper) {
         Objects.requireNonNull(mapper, "mapper can't be null");
         final String name = builder.newProcessorName(FLATMAP_NAME);
 
@@ -271,7 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? 
extends Iterable<? extends V1>> mapper) {
+    public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? 
extends Iterable<? extends VR>> mapper) {
         return flatMapValues(withKey(mapper));
     }
 
@@ -446,19 +447,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? 
super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
+    public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? 
super K, ? super V, KeyValue<KR, VR>> transformerSupplier,
                                               final String... stateStoreNames) 
{
         Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
         final String name = builder.newProcessorName(TRANSFORM_NAME);
 
-        final ProcessorParameters processorParameters = new 
ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name);
-
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new 
StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(new 
KStreamTransform<>(transformerSupplier), name),
+            stateStoreNames,
+            null,
+            true
+        );
 
-        final StatefulProcessorNode<K1, V1> transformNode = new 
StatefulProcessorNode<>(name,
-                                                                               
         processorParameters,
-                                                                               
         stateStoreNames,
-                                                                               
         null,
-                                                                               
         true);
         transformNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
@@ -467,7 +468,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <V1> KStream<K, V1> transformValues(final 
ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier,
+    public <VR> KStream<K, VR> transformValues(final 
ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                                final String... 
stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformSupplier can't be null");
 
@@ -486,14 +487,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
                                                   final String... 
stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
 
+        final StatefulProcessorNode<? super K, ? super V> transformNode = new 
StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(new 
KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
+            stateStoreNames,
+            null,
+            repartitionRequired
+        );
 
-        final ProcessorParameters processorParameters = new 
ProcessorParameters<>(new 
KStreamTransformValues<>(valueTransformerWithKeySupplier), name);
-
-        final StatefulProcessorNode<K, VR> transformNode = new 
StatefulProcessorNode<>(name,
-                                                                               
        processorParameters,
-                                                                               
        stateStoreNames,
-                                                                               
        null,
-                                                                               
        repartitionRequired);
         transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
@@ -508,19 +509,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be 
null");
         final String name = builder.newProcessorName(PROCESSOR_NAME);
 
-        final ProcessorParameters processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
-        final StatefulProcessorNode<K, V> processNode = new 
StatefulProcessorNode<>(name,
-                                                                               
     processorParameters,
-                                                                               
     stateStoreNames,
-                                                                               
     null,
-                                                                               
     repartitionRequired);
+        final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(processorSupplier, name),
+            stateStoreNames,
+            null,
+            repartitionRequired
+        );
+
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
-                                      final ValueJoiner<? super V, ? super V1, 
? extends R> joiner,
-                                      final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> join(final KStream<K, VO> other,
+                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner,
+                                        final JoinWindows windows) {
         return join(other, joiner, windows, Joined.with(null, null, null));
     }
 
@@ -539,9 +542,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
-                                           final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner,
-                                           final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other,
+                                             final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                             final JoinWindows windows) {
         return outerJoin(other, joiner, windows, Joined.with(null, null, 
null));
     }
 
@@ -553,28 +556,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         return doJoin(other, joiner, windows, joined, new 
KStreamImplJoin(true, true));
     }
 
-    private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
-                                         final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
-                                         final JoinWindows windows,
-                                         final Joined<K, V, V1> joined,
-                                         final KStreamImplJoin join) {
+    private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> other,
+                                           final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                           final JoinWindows windows,
+                                           final Joined<K, V, VO> joined,
+                                           final KStreamImplJoin join) {
         Objects.requireNonNull(other, "other KStream can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
 
         KStreamImpl<K, V> joinThis = this;
-        KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
+        KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other;
 
         if (joinThis.repartitionRequired) {
             final String leftJoinRepartitionTopicName = joined.name() != null 
? joined.name() + "-left" : joinThis.name;
-            joinThis = 
joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), 
joined.otherValueSerde(), leftJoinRepartitionTopicName));
+            joinThis = 
joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), 
joined.valueSerde());
         }
 
         if (joinOther.repartitionRequired) {
             final String rightJoinRepartitionTopicName = joined.name() != null 
? joined.name() + "-right" : joinOther.name;
-            final Joined newJoined = Joined.with(joined.keySerde(), 
joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName);
-            joinOther = joinOther.repartitionForJoin(newJoined);
+            joinOther = 
joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), 
joined.otherValueSerde());
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -591,18 +593,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     /**
      * Repartition a stream. This is required on join operations occurring 
after
      * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
-     *
-     * @param joined joined control object
-     * @return a new {@link KStreamImpl}
      */
-    private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) 
{
-        final Serde<K> repartitionKeySerde = joined.keySerde() != null ? 
joined.keySerde() : keySerde;
-        final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? 
joined.valueSerde() : valSerde;
-        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, 
V> optimizableRepartitionNodeBuilder = 
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+    private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
+                                                 final Serde<K> 
keySerdeOverride,
+                                                 final Serde<V> 
valueSerdeOverride) {
+        final Serde<K> repartitionKeySerde = keySerdeOverride != null ? 
keySerdeOverride : keySerde;
+        final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? 
valueSerdeOverride : valSerde;
+        final OptimizableRepartitionNodeBuilder<K, V> 
optimizableRepartitionNodeBuilder =
+            OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
         final String repartitionedSourceName = 
createRepartitionedSource(builder,
                                                                          
repartitionKeySerde,
                                                                          
repartitionValueSerde,
-                                                                         
joined.name(),
+                                                                         
repartitionName,
                                                                          
optimizableRepartitionNodeBuilder);
 
         final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
@@ -615,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
                                                      final Serde<K1> keySerde,
                                                      final Serde<V1> valSerde,
                                                      final String 
repartitionTopicNamePrefix,
-                                                     final 
OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> 
optimizableRepartitionNodeBuilder) {
+                                                     final 
OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) {
 
 
         final String repartitionTopic = repartitionTopicNamePrefix + 
REPARTITION_TOPIC_SUFFIX;
@@ -644,9 +646,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
-                                          final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
-                                          final JoinWindows windows) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other,
+                                            final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                            final JoinWindows windows) {
         return leftJoin(other, joiner, windows, Joined.with(null, null, null));
     }
 
@@ -667,21 +669,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
-                                      final ValueJoiner<? super V, ? super V1, 
? extends R> joiner) {
+    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner) {
         return join(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
-    public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other,
-                                        final ValueJoiner<? super V, ? super 
VT, ? extends VR> joiner,
-                                        final Joined<K, V, VT> joined) {
+    public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other,
+                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner,
+                                        final Joined<K, V, VO> joined) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final Joined<K, V, ?> updatedJoined = joined.name() != null ? 
joined : joined.withName(name);
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(updatedJoined);
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
+                joined.name() != null ? joined.name() : name,
+                joined.keySerde(),
+                joined.valueSerde()
+            );
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
joined, false);
         } else {
             return doStreamTableJoin(other, joiner, joined, false);
@@ -689,20 +694,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final 
ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other, final 
ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
         return leftJoin(other, joiner, Joined.with(null, null, null));
     }
 
     @Override
-    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
-                                            final ValueJoiner<? super V, ? 
super VT, ? extends VR> joiner,
-                                            final Joined<K, V, VT> joined) {
+    public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other,
+                                            final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                            final Joined<K, V, VO> joined) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final Joined<K, V, ?> updatedJoined = joined.name() != null ? 
joined : joined.withName(name);
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(updatedJoined);
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
+                joined.name() != null ? joined.name() : name,
+                joined.keySerde(),
+                joined.valueSerde()
+            );
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
joined, true);
         } else {
             return doStreamTableJoin(other, joiner, joined, true);
@@ -710,28 +718,28 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @Override
-    public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> 
globalTable,
-                                            final KeyValueMapper<? super K, ? 
super V, ? extends K1> keyMapper,
-                                            final ValueJoiner<? super V, ? 
super V1, ? extends V2> joiner) {
+    public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> 
globalTable,
+                                            final KeyValueMapper<? super K, ? 
super V, ? extends KG> keyMapper,
+                                            final ValueJoiner<? super V, ? 
super VG, ? extends VR> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, false);
     }
 
     @Override
-    public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> 
globalTable,
-                                              final KeyValueMapper<? super K, 
? super V, ? extends K1> keyMapper,
-                                              final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner) {
+    public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> 
globalTable,
+                                                final KeyValueMapper<? super 
K, ? super V, ? extends KG> keyMapper,
+                                                final ValueJoiner<? super V, ? 
super VG, ? extends VR> joiner) {
         return globalTableJoin(globalTable, keyMapper, joiner, true);
     }
 
-    private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, 
V1> globalTable,
-                                                        final KeyValueMapper<? 
super K, ? super V, ? extends K1> keyMapper,
-                                                        final ValueJoiner<? 
super V, ? super V1, ? extends V2> joiner,
+    private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, 
VG> globalTable,
+                                                        final KeyValueMapper<? 
super K, ? super V, ? extends KG> keyMapper,
+                                                        final ValueJoiner<? 
super V, ? super VG, ? extends VR> joiner,
                                                         final boolean 
leftJoin) {
         Objects.requireNonNull(globalTable, "globalTable can't be null");
         Objects.requireNonNull(keyMapper, "keyMapper can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = 
((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
+        final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = 
((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
 
         final ProcessorSupplier<K, V> processorSupplier = new 
KStreamGlobalKTableJoin<>(
@@ -753,18 +761,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
     }
 
     @SuppressWarnings("unchecked")
-    private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
-                                                    final ValueJoiner<? super 
V, ? super V1, ? extends R> joiner,
-                                                    final Joined<K, V, V1> 
joined,
-                                                    final boolean leftJoin) {
+    private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> 
other,
+                                                      final ValueJoiner<? 
super V, ? super VO, ? extends VR> joiner,
+                                                      final Joined<K, V, VO> 
joined,
+                                                      final boolean leftJoin) {
         Objects.requireNonNull(other, "other KTable can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final Set<String> allSourceNodes = 
ensureJoinableWith((AbstractStream<K, V1>) other);
+        final Set<String> allSourceNodes = 
ensureJoinableWith((AbstractStream<K, VO>) other);
 
         final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME 
: JOIN_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new 
KStreamKTableJoin<>(
-            ((KTableImpl<K, ?, V1>) other).valueGetterSupplier(),
+            ((KTableImpl<K, ?, VO>) other).valueGetterSupplier(),
             joiner,
             leftJoin
         );
@@ -785,7 +793,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
     }
 
     @Override
-    public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, 
? super V, K1> selector) {
+    public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, 
? super V, KR> selector) {
         return groupBy(selector, Grouped.with(null, valSerde));
     }
 
@@ -906,7 +914,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
             final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new 
ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams);
             builder.addGraphNode(otherStreamsGraphNode, 
otherWindowedStreamsNode);
 
-            final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = 
new KStreamKStreamJoin<>(
+            final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new 
KStreamKStreamJoin<>(
                 otherWindowStore.name(),
                 windows.beforeMs,
                 windows.afterMs,
@@ -914,7 +922,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
                 leftOuter
             );
 
-            final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther 
= new KStreamKStreamJoin<>(
+            final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new 
KStreamKStreamJoin<>(
                 thisWindowStore.name(),
                 windows.afterMs,
                 windows.beforeMs,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index 4b767b4..4251dfa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
  */
 public class ProcessorParameters<K, V> {
 
-    private final ProcessorSupplier<? super K, ? super V> processorSupplier;
+    private final ProcessorSupplier<K, V> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> 
processorSupplier,
+    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
                                final String processorName) {
 
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<? super K, ? super V> processorSupplier() {
+    public ProcessorSupplier<K, V> processorSupplier() {
         return processorSupplier;
     }
 
@@ -49,8 +49,8 @@ public class ProcessorParameters<K, V> {
     @Override
     public String toString() {
         return "ProcessorParameters{" +
-               "processor class=" + processorSupplier.get().getClass() +
-               ", processor name='" + processorName + '\'' +
-               '}';
+            "processor class=" + processorSupplier.get().getClass() +
+            ", processor name='" + processorName + '\'' +
+            '}';
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index c2b445e..2dc6aad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -32,7 +32,7 @@ public class StatefulProcessorNode<K, V> extends 
ProcessorGraphNode<K, V> {
 
 
     public StatefulProcessorNode(final String nodeName,
-                                 final ProcessorParameters processorParameters,
+                                 final ProcessorParameters<K, V> 
processorParameters,
                                  final String[] storeNames,
                                  final StoreBuilder<? extends StateStore> 
materializedKTableStoreBuilder,
                                  final boolean repartitionRequired) {
@@ -75,7 +75,7 @@ public class StatefulProcessorNode<K, V> extends 
ProcessorGraphNode<K, V> {
 
     public static final class StatefulProcessorNodeBuilder<K, V> {
 
-        private ProcessorParameters processorSupplier;
+        private ProcessorParameters<K, V> processorSupplier;
         private String nodeName;
         private boolean repartitionRequired;
         private String[] storeNames;
@@ -84,7 +84,7 @@ public class StatefulProcessorNode<K, V> extends 
ProcessorGraphNode<K, V> {
         private StatefulProcessorNodeBuilder() {
         }
 
-        public StatefulProcessorNodeBuilder<K, V> 
withProcessorParameters(final ProcessorParameters processorParameters) {
+        public StatefulProcessorNodeBuilder<K, V> 
withProcessorParameters(final ProcessorParameters<K, V> processorParameters) {
             this.processorSupplier = processorParameters;
             return this;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 37265fa..20ce3ff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -49,12 +49,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> gracelessAncestor = new 
StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new Processor<Object, Object>() {
+                () -> new Processor<String, Long>() {
                     @Override
                     public void init(final ProcessorContext context) {}
 
                     @Override
-                    public void process(final Object key, final Object value) 
{}
+                    public void process(final String key, final Long value) {}
 
                     @Override
                     public void close() {}
@@ -141,12 +141,12 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> statefulParent = new 
StatefulProcessorNode<>(
             "stateful",
             new ProcessorParameters<>(
-                () -> new Processor<Object, Object>() {
+                () -> new Processor<String, Long>() {
                     @Override
                     public void init(final ProcessorContext context) {}
 
                     @Override
-                    public void process(final Object key, final Object value) 
{}
+                    public void process(final String key, final Long value) {}
 
                     @Override
                     public void close() {}

Reply via email to