ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1851710543


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier() {
 
     public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
         if (processorSupplier != null) {
-            topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
-            if (processorSupplier.stores() != null) {
-                for (final StoreBuilder<?> storeBuilder : 
processorSupplier.stores()) {
+            ApiUtils.checkSupplier(processorSupplier);
+
+            final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =

Review Comment:
   this is how the DSL implementation will work -- processors are wrapped when 
building the DSL into a Topology, specifically in this method where they are 
added to the InternalTopologyBuilder.
   
   Note this PR is only a partial implementation since not all DSL operators go 
through this class, some just write their processors to the 
InternalTopologyBuilder directly. Followup PRs will tackle converting these 
operators and enforcing that any new ones added in the future will have to go 
through this method and be wrapped



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to