vvcephei commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r428764142
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ########## @@ -77,6 +79,38 @@ public void shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST); } + + @Test + public void shouldReuseRepartitionTopicWithGeneratedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); + newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-one"); + newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100))).to("out-to"); + assertEquals(expectedTopologyWithGeneratedRepartitionTopic, builder.build(props).describe().toString()); + } + + @Test + public void shouldCreateRepartitionTopicsWithUserProvidedName() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); + final KStream<String, String> newStream = stream1.map((k, v) -> new KeyValue<>(v, k)); + final StreamJoined<String, String, String> streamJoined = StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()); + newStream.join(stream2, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("first-join")).to("out-one"); + newStream.join(stream3, (value1, value2) -> value1 + value2, JoinWindows.of(ofMillis(100)), streamJoined.withName("second-join")).to("out-two"); + final Topology topology = builder.build(props); + System.out.println(topology.describe().toString()); + assertEquals(expectedTopologyWithUserNamedRepartitionTopics, topology.describe().toString()); Review comment: Thanks for the discussion, all. Coming back to this proposal, and considering the points you've raised, it seems like we should re-use the generated repartition node when the name is generated, and create two repartition nodes when they are named. The purpose of re-using the repartition node in this PR isn't exactly to optimize anything, just to avoid throwing the exception that happens when we currently try to create the exact same repartition node twice. We could instead _always_ create two nodes, but this is needlessly wasteful. Reusing the same-named node makes perfect sense. When the operations are named, on the other hand, there is no problem right now, since we are creating differently named nodes. Since there's no problem, we shouldn't "solve" it ;) It's true that this isn't the most optimal physical plan, but for anyone who cares enough to look into it, they can just add the repartition node first, as you suggested @mjsax; we don't need to throw an exception to force them to fine-tune their program. The other option is that they can enable topology optimization, which will also collapse the named repartition nodes in a well-defined way. Compatibility is a concern, and it seems like it's satisfied if we follow this path: 1. You currently cannot reuse the same stream in two anonymous joins, so we can share the node without breaking any program 2. You currently _can_ reuse the same stream in two _named_ joins, and we will create two (named) repartition topics. We have no choice but to maintain this, or we will break compatibility. 3. Inserting a repartition node is well defined to break compatibility, so people will know they have to reset. 4. Adding Optimization is well defined to break compatibility, so people will know they have to reset. Have I missed some consideration? Thanks, -John ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org