vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428764142



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
         
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
     }
 
+
+    @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        final StreamJoined<String, String, String> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+        final Topology topology =  builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
       Thanks for the discussion, all.
   
   Coming back to this proposal, and considering the points you've raised, it 
seems like we should re-use the generated repartition node when the name is 
generated, and create two repartition nodes when they are named.
   
   The purpose of re-using the repartition node in this PR isn't exactly to 
optimize anything, just to avoid throwing the exception that happens when we 
currently try to create the exact same repartition node twice. We could instead 
_always_ create two nodes, but this is needlessly wasteful. Reusing the 
same-named node makes perfect sense.
   
   When the operations are named, on the other hand, there is no problem right 
now, since we are creating differently named nodes. Since there's no problem, 
we shouldn't "solve" it ;)
   
   It's true that this isn't the most optimal physical plan, but for anyone who 
cares enough to look into it, they can just add the repartition node first, as 
you suggested @mjsax; we don't need to throw an exception to force them to 
fine-tune their program.
   
   The other option is that they can enable topology optimization, which will 
also collapse the named repartition nodes in a well-defined way.
   
   Compatibility is a concern, and it seems like it's satisfied if we follow 
this path:
   1. You currently cannot reuse the same stream in two anonymous joins, so we 
can share the node without breaking any program
   2. You currently _can_ reuse the same stream in two _named_ joins, and we 
will create two (named) repartition topics. We have no choice but to maintain 
this, or we will break compatibility.
   3. Inserting a repartition node is well defined to break compatibility, so 
people will know they have to reset.
   4. Adding Optimization is well defined to break compatibility, so people 
will know they have to reset.
   
   Have I missed some consideration?
   Thanks,
   -John




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to