[
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511476#comment-16511476
]
ASF GitHub Bot commented on KAFKA-7055:
---------------------------------------
nixsticks closed pull request #5214: KAFKA-7055: Update InternalTopologyBuilder
to throw TopologyException…
URL: https://github.com/apache/kafka/pull/5214
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 36a2edc6766..cf50c1c4bed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -442,6 +442,10 @@ public final void addSource(final Topology.AutoOffsetReset
offsetReset,
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(topic, "topic must not be null");
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Sink " + name + " must have at least
one parent");
+ }
+
addSink(name, new StaticTopicNameExtractor<K, V>(topic),
keySerializer, valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic);
}
@@ -457,6 +461,9 @@ public final void addSource(final Topology.AutoOffsetReset
offsetReset,
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already
added.");
}
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Sink " + name + " must have at least
one parent");
+ }
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name can't be
null");
@@ -484,6 +491,9 @@ public final void addProcessor(final String name,
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already
added.");
}
+ if (predecessorNames.length == 0) {
+ throw new TopologyException("Processor " + name + " must have at
least one parent");
+ }
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name must not be
null");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index a539a1bcda0..f1ee81ff367 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -116,7 +116,8 @@ public StateStore getStateStore(final String name) {
if (sendTo != null) {
final ProcessorNode child = currentNode().getChild(sendTo);
if (child == null) {
- throw new StreamsException("Unknown processor name: " +
sendTo);
+ throw new StreamsException("Unknown downstream node: " +
sendTo + " either does not exist or is not" +
+ " connected to this processor.");
}
forward(child, key, value);
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 8b478852c41..63d5b18a3ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -174,6 +174,15 @@ public void shouldNotAllowToAddProcessorWithSameName() {
} catch (final TopologyException expected) { }
}
+ @Test
+ public void shouldNotAllowToAddProcessorWithoutAtLeastOneParent() {
+ topology.addSource("source", "topic-1");
+ try {
+ topology.addProcessor("processor", new MockProcessorSupplier());
+ fail("Should throw TopologyException for processor without at
least one parent node");
+ } catch (final TopologyException expected) { }
+ }
+
@Test(expected = TopologyException.class)
public void shouldFailOnUnknownSource() {
topology.addProcessor("processor", new MockProcessorSupplier(),
"source");
@@ -194,6 +203,16 @@ public void shouldNotAllowToAddSinkWithSameName() {
} catch (final TopologyException expected) { }
}
+ @Test
+ public void shouldNotAllowToAddSinkWithoutAtLeastOneParent() {
+ topology.addSource("source", "topic-1");
+ topology.addProcessor("processor", new MockProcessorSupplier(),
"source");
+ try {
+ topology.addSink("sink", "topic-2");
+ fail("Should throw TopologyException for sink without at least one
parent node");
+ } catch (final TopologyException expected) { }
+ }
+
@Test(expected = TopologyException.class)
public void shouldFailWithUnknownParent() {
topology.addSink("sink", "topic-2", "source");
@@ -236,7 +255,8 @@ public void shouldNotAllowToAddStateStoreToSource() {
public void shouldNotAllowToAddStateStoreToSink() {
mockStoreBuilder();
EasyMock.replay(storeBuilder);
- topology.addSink("sink-1", "topic-1");
+ topology.addSource("source-1", "topic-1");
+ topology.addSink("sink-1", "topic-1", "source-1");
try {
topology.addStateStore(storeBuilder, "sink-1");
fail("Should have thrown TopologyException for adding store to
sink node");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1da04255b11..29227cc6742 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -275,7 +275,8 @@ public void testAddStateStoreWithSource() {
@Test
public void testAddStateStoreWithSink() {
- builder.addSink("sink-1", "topic-1", null, null, null);
+ builder.addSource(null, "source-1", null, null, null, "topic-1");
+ builder.addSink("sink-1", "topic-1", null, null, null, "source-1");
try {
builder.addStateStore(storeBuilder, "sink-1");
fail("Should throw TopologyException with store cannot be added to
sink");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 3412c629a56..513d1c01b05 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -597,7 +597,7 @@ public void shouldOnlyShutdownOnce() {
@Test
public void
shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() {
internalTopologyBuilder.addSource(null, "name", null, null, null,
"topic");
- internalTopologyBuilder.addSink("out", "output", null, null, null);
+ internalTopologyBuilder.addSink("out", "output", null, null, null,
"name");
final StreamThread thread = createStreamThread(clientId, config,
false);
@@ -690,7 +690,7 @@ public boolean conditionMet() {
@Test
public void
shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed()
{
internalTopologyBuilder.addSource(null, "name", null, null, null,
topic1);
- internalTopologyBuilder.addSink("out", "output", null, null, null);
+ internalTopologyBuilder.addSink("out", "output", null, null, null,
"name");
final StreamThread thread = createStreamThread(clientId, new
StreamsConfig(configProps(true)), true);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka Streams Processor API allows you to add sinks and processors without
> parent
> ---------------------------------------------------------------------------------
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Nikki Thean
> Assignee: Nikki Thean
> Priority: Minor
> Labels: easyfix
>
> The Kafka Streams Processor API allows you to define a Topology and connect
> sources, processors, and sinks. From reading through the code, it seems that
> you cannot forward a message to a downstream node unless it is explicitly
> connected to the upstream node (from which you are forwarding the message) as
> a child. Here is an example where you forward using name of downstream node
> rather than child index
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology
> without including parent names, i.e with empty vararg, using this method:
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a
> StreamsException, I suggest throwing an exception if a processor or sink is
> added without at least one upstream node. There is a method in
> `InternalTopologyBuilder` that allows you to connect processors by name after
> you add them to the topology, but it is not part of the external Processor
> API.
> In addition (or alternatively), I suggest making [the error message for when
> users try to forward messages to a node that is not
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
> more descriptive, like this one for when a user attempts to access a state
> store that is not connected to the processor:
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)