This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new fa3fe3e MINOR: Fix generic type of ProcessorParameters (#5741) fa3fe3e is described below commit fa3fe3e68059068ac894552078155b16c21404aa Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Thu Oct 4 19:37:53 2018 -0500 MINOR: Fix generic type of ProcessorParameters (#5741) In unrelated recent work, I noticed some warnings about the missing type parameters on ProcessorParameters. While investigating it, it seems like there was a bug in the creation of repartition topics. Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../internals/GroupedStreamAggregateBuilder.java | 7 +- .../kstream/internals/KGroupedTableImpl.java | 22 +-- .../streams/kstream/internals/KStreamImpl.java | 184 +++++++++++---------- .../internals/graph/ProcessorParameters.java | 12 +- .../internals/graph/StatefulProcessorNode.java | 6 +- .../internals/graph/GraphGraceSearchUtilTest.java | 8 +- 6 files changed, 125 insertions(+), 114 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index 3439cf5..8e6f990 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -84,16 +84,17 @@ class GroupedStreamAggregateBuilder<K, V> { builder.addGraphNode(parentNode, repartitionNode); parentNode = repartitionNode; } - final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, T> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); + final StatefulProcessorNode.StatefulProcessorNodeBuilder<K, V> statefulProcessorNodeBuilder = StatefulProcessorNode.statefulProcessorNodeBuilder(); + + final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); - final ProcessorParameters processorParameters = new ProcessorParameters<>(aggregateSupplier, aggFunctionName); statefulProcessorNodeBuilder .withProcessorParameters(processorParameters) .withNodeName(aggFunctionName) .withRepartitionRequired(repartitionRequired) .withStoreBuilder(storeBuilder); - final StatefulProcessorNode<K, T> statefulProcessorNode = statefulProcessorNodeBuilder.build(); + final StatefulProcessorNode<K, V> statefulProcessorNode = statefulProcessorNodeBuilder.build(); builder.addGraphNode(parentNode, statefulProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index c97576b..013028d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -47,7 +47,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - protected final String userSpecifiedName; + private final String userSpecifiedName; private final Initializer<Long> countInitializer = () -> 0L; @@ -72,7 +72,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr final String sourceName = builder.newProcessorName(KStreamImpl.SOURCE_NAME); final String funcName = builder.newProcessorName(functionName); final String repartitionTopic = (userSpecifiedName != null ? userSpecifiedName : materialized.storeName()) - + KStreamImpl.REPARTITION_TOPIC_SUFFIX; + + KStreamImpl.REPARTITION_TOPIC_SUFFIX; final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, repartitionTopic); @@ -98,16 +98,18 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr builder); } - private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized, - final String functionName, - final ProcessorSupplier<K, Change<V>> aggregateSupplier) { + private <T> StatefulProcessorNode<K, Change<V>> getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized, + final String functionName, + final ProcessorSupplier<K, Change<V>> aggregateSupplier) { final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName); - return StatefulProcessorNode.statefulProcessorNodeBuilder() + return StatefulProcessorNode + .<K, Change<V>>statefulProcessorNodeBuilder() .withNodeName(functionName) .withProcessorParameters(aggregateFunctionProcessorParams) - .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build(); + .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()) + .build(); } private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName, @@ -164,9 +166,9 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr } final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(materializedInternal.storeName(), - countInitializer, - countAdder, - countSubtractor); + countInitializer, + countAdder, + countSubtractor); return doAggregate(aggregateSupplier, AGGREGATE_NAME, materializedInternal); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 96fa8b9..49dbbd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode; +import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; @@ -165,7 +166,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { + public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper); @@ -178,10 +179,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } - private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { + private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) { final String name = builder.newProcessorName(KEY_SELECT_NAME); - final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value)); + final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value)); final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name); @@ -189,7 +190,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1, V1> KStream<K1, V1> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) { + public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); final String name = builder.newProcessorName(MAP_NAME); @@ -212,7 +213,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @Override - public <V1> KStream<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper) { + public <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper) { return mapValues(withKey(mapper)); } @@ -250,7 +251,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) { + public <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); final String name = builder.newProcessorName(FLATMAP_NAME); @@ -271,7 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1> KStream<K, V1> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) { + public <VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) { return flatMapValues(withKey(mapper)); } @@ -446,19 +447,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1, V1> KStream<K1, V1> transform(final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier, + public <KR, VR> KStream<KR, VR> transform(final TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, final String... stateStoreNames) { Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null"); final String name = builder.newProcessorName(TRANSFORM_NAME); - final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name); - + final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( + name, + new ProcessorParameters<>(new KStreamTransform<>(transformerSupplier), name), + stateStoreNames, + null, + true + ); - final StatefulProcessorNode<K1, V1> transformNode = new StatefulProcessorNode<>(name, - processorParameters, - stateStoreNames, - null, - true); transformNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, transformNode); @@ -467,7 +468,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1> KStream<K, V1> transformValues(final ValueTransformerSupplier<? super V, ? extends V1> valueTransformerSupplier, + public <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, final String... stateStoreNames) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be null"); @@ -486,14 +487,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); + final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>( + name, + new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name), + stateStoreNames, + null, + repartitionRequired + ); - final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name); - - final StatefulProcessorNode<K, VR> transformNode = new StatefulProcessorNode<>(name, - processorParameters, - stateStoreNames, - null, - repartitionRequired); transformNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, transformNode); @@ -508,19 +509,21 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K Objects.requireNonNull(processorSupplier, "ProcessSupplier cant' be null"); final String name = builder.newProcessorName(PROCESSOR_NAME); - final ProcessorParameters processorParameters = new ProcessorParameters<>(processorSupplier, name); - final StatefulProcessorNode<K, V> processNode = new StatefulProcessorNode<>(name, - processorParameters, - stateStoreNames, - null, - repartitionRequired); + final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( + name, + new ProcessorParameters<>(processorSupplier, name), + stateStoreNames, + null, + repartitionRequired + ); + builder.addGraphNode(this.streamsGraphNode, processNode); } @Override - public <V1, R> KStream<K, R> join(final KStream<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final JoinWindows windows) { + public <VO, VR> KStream<K, VR> join(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows) { return join(other, joiner, windows, Joined.with(null, null, null)); } @@ -539,9 +542,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final JoinWindows windows) { + public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows) { return outerJoin(other, joiner, windows, Joined.with(null, null, null)); } @@ -553,28 +556,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true)); } - private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final JoinWindows windows, - final Joined<K, V, V1> joined, - final KStreamImplJoin join) { + private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined, + final KStreamImplJoin join) { Objects.requireNonNull(other, "other KStream can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(windows, "windows can't be null"); Objects.requireNonNull(joined, "joined can't be null"); KStreamImpl<K, V> joinThis = this; - KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other; + KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other; if (joinThis.repartitionRequired) { final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name; - joinThis = joinThis.repartitionForJoin(Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), leftJoinRepartitionTopicName)); + joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde()); } if (joinOther.repartitionRequired) { final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name; - final Joined newJoined = Joined.with(joined.keySerde(), joined.valueSerde(), joined.otherValueSerde(), rightJoinRepartitionTopicName); - joinOther = joinOther.repartitionForJoin(newJoined); + joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde()); } joinThis.ensureJoinableWith(joinOther); @@ -591,18 +593,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K /** * Repartition a stream. This is required on join operations occurring after * an operation that changes the key, i.e, selectKey, map(..), flatMap(..). - * - * @param joined joined control object - * @return a new {@link KStreamImpl} */ - private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) { - final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde; - final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde; - final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); + private KStreamImpl<K, V> repartitionForJoin(final String repartitionName, + final Serde<K> keySerdeOverride, + final Serde<V> valueSerdeOverride) { + final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde; + final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valSerde; + final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = + OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); final String repartitionedSourceName = createRepartitionedSource(builder, repartitionKeySerde, repartitionValueSerde, - joined.name(), + repartitionName, optimizableRepartitionNodeBuilder); final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); @@ -615,7 +617,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final Serde<K1> keySerde, final Serde<V1> valSerde, final String repartitionTopicNamePrefix, - final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) { + final OptimizableRepartitionNodeBuilder<K1, V1> optimizableRepartitionNodeBuilder) { final String repartitionTopic = repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX; @@ -644,9 +646,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final JoinWindows windows) { + public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows) { return leftJoin(other, joiner, windows, Joined.with(null, null, null)); } @@ -667,21 +669,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1, R> KStream<K, R> join(final KTable<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { + public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) { return join(other, joiner, Joined.with(null, null, null)); } @Override - public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, - final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, - final Joined<K, V, VT> joined) { + public <VO, VR> KStream<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Joined<K, V, VO> joined) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); if (repartitionRequired) { - final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name); - final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined); + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( + joined.name() != null ? joined.name() : name, + joined.keySerde(), + joined.valueSerde() + ); return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false); } else { return doStreamTableJoin(other, joiner, joined, false); @@ -689,20 +694,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { + public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner) { return leftJoin(other, joiner, Joined.with(null, null, null)); } @Override - public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, - final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, - final Joined<K, V, VT> joined) { + public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Joined<K, V, VO> joined) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); if (repartitionRequired) { - final Joined<K, V, ?> updatedJoined = joined.name() != null ? joined : joined.withName(name); - final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(updatedJoined); + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( + joined.name() != null ? joined.name() : name, + joined.keySerde(), + joined.valueSerde() + ); return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true); } else { return doStreamTableJoin(other, joiner, joined, true); @@ -710,28 +718,28 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1, V1, V2> KStream<K, V2> join(final GlobalKTable<K1, V1> globalTable, - final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, - final ValueJoiner<? super V, ? super V1, ? extends V2> joiner) { + public <KG, VG, VR> KStream<K, VR> join(final GlobalKTable<KG, VG> globalTable, + final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, + final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) { return globalTableJoin(globalTable, keyMapper, joiner, false); } @Override - public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, - final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { + public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTable, + final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, + final ValueJoiner<? super V, ? super VG, ? extends VR> joiner) { return globalTableJoin(globalTable, keyMapper, joiner, true); } - private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable, - final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, - final ValueJoiner<? super V, ? super V1, ? extends V2> joiner, + private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable, + final KeyValueMapper<? super K, ? super V, ? extends KG> keyMapper, + final ValueJoiner<? super V, ? super VG, ? extends VR> joiner, final boolean leftJoin) { Objects.requireNonNull(globalTable, "globalTable can't be null"); Objects.requireNonNull(keyMapper, "keyMapper can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = ((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier(); + final KTableValueGetterSupplier<KG, VG> valueGetterSupplier = ((GlobalKTableImpl<KG, VG>) globalTable).valueGetterSupplier(); final String name = builder.newProcessorName(LEFTJOIN_NAME); final ProcessorSupplier<K, V> processorSupplier = new KStreamGlobalKTableJoin<>( @@ -753,18 +761,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @SuppressWarnings("unchecked") - private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final Joined<K, V, V1> joined, - final boolean leftJoin) { + private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Joined<K, V, VO> joined, + final boolean leftJoin) { Objects.requireNonNull(other, "other KTable can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other); + final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, VO>) other); final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>( - ((KTableImpl<K, ?, V1>) other).valueGetterSupplier(), + ((KTableImpl<K, ?, VO>) other).valueGetterSupplier(), joiner, leftJoin ); @@ -785,7 +793,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K } @Override - public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) { + public <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector) { return groupBy(selector, Grouped.with(null, valSerde)); } @@ -906,7 +914,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams); builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode); - final KStreamKStreamJoin<K1, R, ? super V1, ? super V2> joinThis = new KStreamKStreamJoin<>( + final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>( otherWindowStore.name(), windows.beforeMs, windows.afterMs, @@ -914,7 +922,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K leftOuter ); - final KStreamKStreamJoin<K1, R, ? super V2, ? super V1> joinOther = new KStreamKStreamJoin<>( + final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>( thisWindowStore.name(), windows.afterMs, windows.beforeMs, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java index 4b767b4..4251dfa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java @@ -28,17 +28,17 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; */ public class ProcessorParameters<K, V> { - private final ProcessorSupplier<? super K, ? super V> processorSupplier; + private final ProcessorSupplier<K, V> processorSupplier; private final String processorName; - public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier, + public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, final String processorName) { this.processorSupplier = processorSupplier; this.processorName = processorName; } - public ProcessorSupplier<? super K, ? super V> processorSupplier() { + public ProcessorSupplier<K, V> processorSupplier() { return processorSupplier; } @@ -49,8 +49,8 @@ public class ProcessorParameters<K, V> { @Override public String toString() { return "ProcessorParameters{" + - "processor class=" + processorSupplier.get().getClass() + - ", processor name='" + processorName + '\'' + - '}'; + "processor class=" + processorSupplier.get().getClass() + + ", processor name='" + processorName + '\'' + + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java index c2b445e..2dc6aad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java @@ -32,7 +32,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { public StatefulProcessorNode(final String nodeName, - final ProcessorParameters processorParameters, + final ProcessorParameters<K, V> processorParameters, final String[] storeNames, final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder, final boolean repartitionRequired) { @@ -75,7 +75,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { public static final class StatefulProcessorNodeBuilder<K, V> { - private ProcessorParameters processorSupplier; + private ProcessorParameters<K, V> processorSupplier; private String nodeName; private boolean repartitionRequired; private String[] storeNames; @@ -84,7 +84,7 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K, V> { private StatefulProcessorNodeBuilder() { } - public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters processorParameters) { + public StatefulProcessorNodeBuilder<K, V> withProcessorParameters(final ProcessorParameters<K, V> processorParameters) { this.processorSupplier = processorParameters; return this; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 37265fa..20ce3ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -49,12 +49,12 @@ public class GraphGraceSearchUtilTest { final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>( "stateful", new ProcessorParameters<>( - () -> new Processor<Object, Object>() { + () -> new Processor<String, Long>() { @Override public void init(final ProcessorContext context) {} @Override - public void process(final Object key, final Object value) {} + public void process(final String key, final Long value) {} @Override public void close() {} @@ -141,12 +141,12 @@ public class GraphGraceSearchUtilTest { final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>( "stateful", new ProcessorParameters<>( - () -> new Processor<Object, Object>() { + () -> new Processor<String, Long>() { @Override public void init(final ProcessorContext context) {} @Override - public void process(final Object key, final Object value) {} + public void process(final String key, final Long value) {} @Override public void close() {}