bbejeck commented on code in PR #18722:
URL: https://github.com/apache/kafka/pull/18722#discussion_r1933030988


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java:
##########
@@ -91,20 +91,20 @@ Set<String> ensureCopartitionWith(final Collection<? 
extends AbstractStream<K, ?
         return allSourceNodes;
     }
 
-    static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final 
ValueJoiner<T1, T2, R> joiner) {
+    static <VRight, VLeft, VOut> ValueJoiner<VRight, VLeft, VOut> 
reverseJoiner(final ValueJoiner<VLeft, VRight, VOut> joiner) {
         return (value2, value1) -> joiner.apply(value1, value2);
     }
 
-    static <K, T2, T1, R> ValueJoinerWithKey<K, T2, T1, R> 
reverseJoinerWithKey(final ValueJoinerWithKey<K, T1, T2, R> joiner) {
+    static <K, VRight, VLeft, VOut> ValueJoinerWithKey<K, VRight, VLeft, VOut> 
reverseJoinerWithKey(final ValueJoinerWithKey<K, VLeft, VRight, VOut> joiner) {
         return (key, value2, value1) -> joiner.apply(key, value1, value2);
     }
 
-    static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final 
ValueMapper<V, VR> valueMapper) {
+    static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final 
ValueMapper<V, VOut> valueMapper) {
         Objects.requireNonNull(valueMapper, "valueMapper can't be null");
         return (readOnlyKey, value) -> valueMapper.apply(value);
     }
 
-    static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> 
toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
+    static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut> 
toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {

Review Comment:
   All the other methods have generics of `? super` or `? extends` why not here 
- asking more more my own education.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -3062,7 +3062,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final 
GlobalKTable<GK, GV> globalTable,
      * @see #map(KeyValueMapper)
      */
     <KOut, VOut> KStream<KOut, VOut> process(
-        final ProcessorSupplier<? super K, ? super V, KOut, VOut> 
processorSupplier,
+        final ProcessorSupplier<? super K, ? super V, ? extends KOut, ? 
extends VOut> processorSupplier,

Review Comment:
   I think given that we have current usage of `? super/extend X` in the code 
base, we don't need a KIP.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1103,26 +1108,28 @@ public <KG, VG, VR> KStream<K, VR> leftJoin(final 
GlobalKTable<KG, VG> globalTab
         return globalTableJoin(globalTable, keySelector, joiner, true, named);
     }
 
-    private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, 
VG> globalTable,
-                                                        final KeyValueMapper<? 
super K, ? super V, ? extends KG> keySelector,
-                                                        final 
ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner,
-                                                        final boolean leftJoin,
-                                                        final Named named) {
+    private <KGlobalTable, VGlobalTable, VOut> KStream<K, VOut> 
globalTableJoin(

Review Comment:
   nit would `GlobalTableK` be more descriptive than `KGlobalTable` ? same for 
`VGlobalTable`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -153,9 +153,9 @@ public KStream<K, V> filter(final Predicate<? super K, ? 
super V> predicate,
         Objects.requireNonNull(named, "named can't be null");
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
-        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+        final ProcessorParameters<K, V, K, V> processorParameters =

Review Comment:
   why the changes to the generic types? For example, from `? super K` to `K` 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -248,9 +248,9 @@ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? 
super K, ? super V, ?
         Objects.requireNonNull(named, "named can't be null");
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
-        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+        final ProcessorParameters<K, V, KR, VR> processorParameters =

Review Comment:
   And here from `?` to `KR`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -906,13 +910,14 @@ private KStreamImpl<K, V> repartitionForJoin(final String 
repartitionName,
             builder);
     }
 
-    static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String 
createRepartitionedSource(final InternalStreamsBuilder builder,
-                                                                               
              final Serde<K1> keySerde,
-                                                                               
              final Serde<V1> valueSerde,
-                                                                               
              final String repartitionTopicNamePrefix,
-                                                                               
              final StreamPartitioner<K1, V1> streamPartitioner,
-                                                                               
              final BaseRepartitionNodeBuilder<K1, V1, RN> 
baseRepartitionNodeBuilder) {
-
+    static <KStream, VStream, RepartitionNode extends 
BaseRepartitionNode<KStream, VStream>> String createRepartitionedSource(
+        final InternalStreamsBuilder builder,

Review Comment:
   why `KStream` and `VStream`? Are there two streams involved here? It's been 
a while since I've looked at this part of the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to