[
https://issues.apache.org/jira/browse/KAFKA-9298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17085398#comment-17085398
]
ASF GitHub Bot commented on KAFKA-9298:
---------------------------------------
bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error
in joins
URL: https://github.com/apache/kafka/pull/8504
When performing a join with a stream that needs repartitioning, Kafka
Streams automatically creates a repartition topic. If the user does not use
`StreamJoined` to name to repartition topic, Kafka Streams uses the generated
name of the KStream instance for the repartition topic name.
If the KStream instance requiring the repartition participates in another
join, the second repartition topic is created using the name of the operator.
This name reuse is what causes the `InvalidTopologyException.` The error
occurs because the `InternalTopologyBuilder` has already registered the
repartition source name previously.
For example, this topology will cause an error because Kafka Streams will
attempt to create two repartition topics (which is correct behavior) but using
the _**same name**_ each time which causes the error.
``` java
KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v,
k));
newStream.join(stream2, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100))).to("out-one");
newStream.join(stream3, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100))).to("out-to");
```
However this topology, which is the same except the user has provided
repartition topic names, is fine. Note the use of `StreamJoined.withName` here
```java
KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v,
k));
final StreamJoined<String, String, String> streamJoined =
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
newStream.join(stream2, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
streamJoined.withName("first-join")).to("out-one");
newStream.join(stream3, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
streamJoined.withName("second-join")).to("out-two");
```
This bug has been present for some time as I tested this out on `2.0` before
we added the optimization layer.
Ideally, the fix should be to generate a repartition topic name each time to
avoid such issues. But IMHO that ship has already sailed because by
introducing a new name generation will cause compatibility issues for existing
topologies. So generating new names is out for now, at least.
The proposed fix is:
1. For KStream objects needing repartitioning _**and using generated
names**, reuse the repartition topic node in any additional joins.
2. For KStream instances needing repartitioning _**using user-provided
names**_ always create a new repartition topic node for each join as each one
will have a unique name
I've added tests confirming the expected behavior.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Reuse of a mapped stream causes an Invalid Topology
> ---------------------------------------------------
>
> Key: KAFKA-9298
> URL: https://issues.apache.org/jira/browse/KAFKA-9298
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Walker Carlson
> Assignee: Bill Bejeck
> Priority: Minor
> Labels: join, streams
>
> Can be found with in the KStreamKStreamJoinTest.java
> {code:java}
> @Test
> public void optimizerIsEager() {
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<String, String> stream1 = builder.stream("topic",
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream<String, String> stream2 = builder.stream("topic2",
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream<String, String> stream3 = builder.stream("topic3",
> Consumed.with(Serdes.String(), Serdes.String()));
> final KStream<String, String> newStream = stream1.map((k, v) -> new
> KeyValue<>(v, k));
> newStream.join(stream2, (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(),
> Serdes.String(), Serdes.String()));
> newStream.join(stream3, (value1, value2) -> value1 + value2,
> JoinWindows.of(ofMillis(100)), StreamJoined.with(Serdes.String(),
> Serdes.String(), Serdes.String()));
> System.err.println(builder.build().describe().toString());
> }
>
> {code}
> **results in
> **
> Invalid topology: Topic KSTREAM-MAP-0000000003-repartition has already been
> registered by another source.
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topic
> KSTREAM-MAP-0000000003-repartition has already been registered by another
> source.
> at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.validateTopicNotAlreadyRegistered(InternalTopologyBuilder.java:578)
> at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addSource(InternalTopologyBuilder.java:378)
> at
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:100)
> at
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:303)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:562)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:551)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKStreamJoinTest.optimizerIsEager(KStreamKStreamJoinTest.java:136)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
> at
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
> at
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
> at com.sun.proxy.$Proxy5.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:182)
> at
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:164)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:412)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
> at
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:48)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:56)
> at java.base/java.lang.Thread.run(Thread.java:834)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)