mjsax commented on code in PR #20910:
URL: https://github.com/apache/kafka/pull/20910#discussion_r2536821665


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -434,6 +456,7 @@ private <VR> KTable<K, VR> doTransformValues(final 
ValueTransformerWithKeySuppli
                                                  final MaterializedInternal<K, 
VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
                                                  final NamedInternal 
namedInternal,
                                                  final String... 
stateStoreNames) {
+        Objects.requireNonNull(transformerSupplier, "transformerSupplier");

Review Comment:
   Because we add the adaptor below, the existing not-null check (which is 
inside `KTableProcessValues` constructor) does not fire any longer, so adding 
the check here to ensure tests don't fail



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -463,9 +486,11 @@ private <VR> KTable<K, VR> doTransformValues(final 
ValueTransformerWithKeySuppli
 
         final String name = namedInternal.orElseGenerateWithPrefix(builder, 
TRANSFORMVALUES_NAME);
 
-        final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new 
KTableTransformValues<>(
+        final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VR> 
fixedKeyProcessorSupplier =
+            
createValueTransformerWithKeySupplierToFixedProcessorSupplierAdaptor(transformerSupplier);

Review Comment:
   To keep the refactoring internal, we put an adopter around the user provided 
"transformer supplier".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to