[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikki Thean updated KAFKA-7055:
-------------------------------
    Description: 
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. Here is an 
[example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index.

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg, using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]

  was:
The Kafka Streams Processor API allows you to define a Topology and connect 
sources, processors, and sinks. From reading through the code, it seems that 
you cannot forward a message to a downstream node unless it is explicitly 
connected to the upstream node (from which you are forwarding the message) as a 
child. 
([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
 where you forward using name of downstream node rather than child index)

However, I've been able to connect processors and sinks to the topology without 
including parent names, i.e with empty vararg (using [this 
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]

As any attempt to forward a message to those nodes will throw a 
StreamsException, I suggest throwing an exception if a processor or sink is 
added without at least one upstream node. There is a method in 
`InternalTopologyBuilder` that allows you to connect processors by name after 
you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when 
users try to forward messages to a node that is not 
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
 more descriptive, like [this one for when a user attempts to access a state 
store that is not connected to the 
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]


> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7055
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7055
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Nikki Thean
>            Assignee: Nikki Thean
>            Priority: Minor
>              Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an 
> [example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
>  where you forward using name of downstream node rather than child index.
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using [this 
> method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].]
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like [this one for when a user attempts to access a state 
> store that is not connected to the 
> processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to