[ https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511595#comment-16511595 ]
Nikki Thean commented on KAFKA-7055: ------------------------------------ [~mjsax] [~guozhang] I may have screwed up the GitHub Bot by closing and opening a new PR :( Sorry. PR is here: https://github.com/apache/kafka/pull/5215 > Kafka Streams Processor API allows you to add sinks and processors without > parent > --------------------------------------------------------------------------------- > > Key: KAFKA-7055 > URL: https://issues.apache.org/jira/browse/KAFKA-7055 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Nikki Thean > Assignee: Nikki Thean > Priority: Minor > Labels: easyfix > > The Kafka Streams Processor API allows you to define a Topology and connect > sources, processors, and sinks. From reading through the code, it seems that > you cannot forward a message to a downstream node unless it is explicitly > connected to the upstream node (from which you are forwarding the message) as > a child. Here is an example where you forward using name of downstream node > rather than child index > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]). > However, I've been able to connect processors and sinks to the topology > without including parent names, i.e with empty vararg, using this method: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]. > As any attempt to forward a message to those nodes will throw a > StreamsException, I suggest throwing an exception if a processor or sink is > added without at least one upstream node. There is a method in > `InternalTopologyBuilder` that allows you to connect processors by name after > you add them to the topology, but it is not part of the external Processor > API. > In addition (or alternatively), I suggest making [the error message for when > users try to forward messages to a node that is not > connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119] > more descriptive, like this one for when a user attempts to access a state > store that is not connected to the processor: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81] -- This message was sent by Atlassian JIRA (v7.6.3#76005)