[
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511195#comment-16511195
]
Nikki Thean commented on KAFKA-7055:
------------------------------------
If project reviewers feel this is a reasonable approach to take, I'm happy to
submit my own pull request and link it to the ticket!
> Kafka Streams Processor API allows you to add sinks and processors without
> parent
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Nikki Thean
> Assignee: Nikki Thean
> Priority: Minor
> Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect
> sources, processors, and sinks. From reading through the code, it seems that
> you cannot forward a message to a downstream node unless it is explicitly
> connected to the upstream node (from which you are forwarding the message) as
> a child.
> ([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
> where you forward using name of downstream node rather than child index)
> However, I've been able to connect processors and sinks to the topology
> without including parent names, i.e with empty vararg (using [this
> method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]
> As any attempt to forward a message to those nodes will throw a
> StreamsException, I suggest throwing an exception if a processor or sink is
> added without at least one upstream node. There is a method in
> `InternalTopologyBuilder` that allows you to connect processors by name after
> you add them to the topology, but it is not part of the external Processor
> API.
> In addition (or alternatively), I suggest making [the error message for when
> users try to forward messages to a node that is not
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
> more descriptive, like [this one for when a user attempts to access a state
> store that is not connected to the
> processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)