ableegoldman commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1851710543
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java: ########## @@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() { public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) { if (processorSupplier != null) { - topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames); - if (processorSupplier.stores() != null) { - for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) { + ApiUtils.checkSupplier(processorSupplier); + + final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = Review Comment: this is how the DSL implementation will work -- processors are wrapped when building the DSL into a Topology, specifically in this method where they are added to the InternalTopologyBuilder. Note this PR is only a partial implementation since not all DSL operators go through this class, some just write their processors to the InternalTopologyBuilder directly. Followup PRs will tackle converting these operators and enforcing that any new ones added in the future will have to go through this method and be wrapped -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org