bbejeck commented on code in PR #18722: URL: https://github.com/apache/kafka/pull/18722#discussion_r1933030988
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java: ########## @@ -91,20 +91,20 @@ Set<String> ensureCopartitionWith(final Collection<? extends AbstractStream<K, ? return allSourceNodes; } - static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) { + static <VRight, VLeft, VOut> ValueJoiner<VRight, VLeft, VOut> reverseJoiner(final ValueJoiner<VLeft, VRight, VOut> joiner) { return (value2, value1) -> joiner.apply(value1, value2); } - static <K, T2, T1, R> ValueJoinerWithKey<K, T2, T1, R> reverseJoinerWithKey(final ValueJoinerWithKey<K, T1, T2, R> joiner) { + static <K, VRight, VLeft, VOut> ValueJoinerWithKey<K, VRight, VLeft, VOut> reverseJoinerWithKey(final ValueJoinerWithKey<K, VLeft, VRight, VOut> joiner) { return (key, value2, value1) -> joiner.apply(key, value1, value2); } - static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) { + static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final ValueMapper<V, VOut> valueMapper) { Objects.requireNonNull(valueMapper, "valueMapper can't be null"); return (readOnlyKey, value) -> valueMapper.apply(value); } - static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) { + static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) { Review Comment: All the other methods have generics of `? super` or `? extends` why not here - asking more more my own education. ########## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ########## @@ -3062,7 +3062,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalTable, * @see #map(KeyValueMapper) */ <KOut, VOut> KStream<KOut, VOut> process( - final ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier, + final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier, Review Comment: I think given that we have current usage of `? super/extend X` in the code base, we don't need a KIP. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1103,26 +1108,28 @@ public <KG, VG, VR> KStream<K, VR> leftJoin(final GlobalKTable<KG, VG> globalTab return globalTableJoin(globalTable, keySelector, joiner, true, named); } - private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> globalTable, - final KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, - final ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner, - final boolean leftJoin, - final Named named) { + private <KGlobalTable, VGlobalTable, VOut> KStream<K, VOut> globalTableJoin( Review Comment: nit would `GlobalTableK` be more descriptive than `KGlobalTable` ? same for `VGlobalTable` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -153,9 +153,9 @@ public KStream<K, V> filter(final Predicate<? super K, ? super V> predicate, Objects.requireNonNull(named, "named can't be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); - final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters = + final ProcessorParameters<K, V, K, V> processorParameters = Review Comment: why the changes to the generic types? For example, from `? super K` to `K` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -248,9 +248,9 @@ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? Objects.requireNonNull(named, "named can't be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME); - final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters = + final ProcessorParameters<K, V, KR, VR> processorParameters = Review Comment: And here from `?` to `KR` ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -906,13 +910,14 @@ private KStreamImpl<K, V> repartitionForJoin(final String repartitionName, builder); } - static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartitionedSource(final InternalStreamsBuilder builder, - final Serde<K1> keySerde, - final Serde<V1> valueSerde, - final String repartitionTopicNamePrefix, - final StreamPartitioner<K1, V1> streamPartitioner, - final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) { - + static <KStream, VStream, RepartitionNode extends BaseRepartitionNode<KStream, VStream>> String createRepartitionedSource( + final InternalStreamsBuilder builder, Review Comment: why `KStream` and `VStream`? Are there two streams involved here? It's been a while since I've looked at this part of the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org