[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-21 Thread GitBox


vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428764142



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
 
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
 }
 
+
+@Test
+public void shouldReuseRepartitionTopicWithGeneratedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+}
+
+@Test
+public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+final Topology topology =  builder.build(props);
+System.out.println(topology.describe().toString());
+assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
   Thanks for the discussion, all.
   
   Coming back to this proposal, and considering the points you've raised, it 
seems like we should re-use the generated repartition node when the name is 
generated, and create two repartition nodes when they are named.
   
   The purpose of re-using the repartition node in this PR isn't exactly to 
optimize anything, just to avoid throwing the exception that happens when we 
currently try to create the exact same repartition node twice. We could instead 
_always_ create two nodes, but this is needlessly wasteful. Reusing the 
same-named node makes perfect sense.
   
   When the operations are named, on the other hand, there is no problem right 
now, since we are creating differently named nodes. Since there's no problem, 
we shouldn't "solve" it ;)
   
   It's true that this isn't the most optimal physical plan, but for anyone who 
cares enough to look into it, they can just add the repartition node first, as 
you suggested @mjsax; we don't need to throw an exception to force them to 
fine-tune their program.
   
   The other option is that they can enable topology optimization, which will 
also collapse the named repartition nodes in a well-defined way.
   
   Compatibility is a concern, and it seems like it's satisfied if we follow 
this path:
   1. You currently cannot reuse the same stream in two anonymous joins, so we 
can share the node without breaking any program
   2. You currently _can_ reuse the same stream in two _named_ joins, and we 
will create two (named) repartition topics. We have no choice but to maintain 
this, or we will break compatibility.
   3. Inserting a repartition node is well defined to break compatibility, so 
people will know they have to reset.
   4. Adding Optimization is well defined to break compatibility, so people 
will know they have to reset.
   
   Have I missed some consideration?
   Thanks,
   -John





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Since you made the mistake of asking my opinion, here it is :) :
   
   > bumping the index
   
   It's true that users can't currently reuse the KStream, so there's no 
compatibility issue there, but we can't bump the index for the _first_ 
repartition topic, or we would break every topology that uses generated 
repartition topic names already. So, either way, we have to cache something to 
tell us to do something different on the "first reuse" (i.e., the second use of 
the KStream).
   
   Since we have to do that anyway, maybe it's fine to just cache the 
repartition node itself instead of a flag that says "bump the index next time". 
   
   > leaking optimizations into the DSL
   
   I'm on the fence about whether this is an "optimization" or "reasonable 
behavior". It sort of feels like the latter, and the only reason we needed to 
introduce the "repartition-collapsing" optimization is that we failed to 
introduce reasonable behavior from the beginning. Also, my read is that the DSL 
builder and the optimizer are not cleanly separated right now anyway, and if we 
ever want to build more optimizations, we'll most likely need to make another 
pass on both anyway. We're also starting to think about topology evolution (cc 
@cadonna ), which makes this a less scary prospect, as we can then implement a 
mechanism to _compatibly_ introduce new optimizations. In other words, I'm not 
taking a hard stance, but leaning in the direction of doing the more efficient 
thing than the more pure thing, since we're not currently super pure anyway.
   
   > Other repartition topics
   
   I think we'd better leave it alone for now, implement topology evolution, 
then migrate to a completely pure and consistent approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Since you made the mistake of asking my opinion, here it is :) :
   
   > bumping the index
   It's true that users can't currently reuse the KStream, so there's no 
compatibility issue there, but we can't bump the index for the _first_ 
repartition topic, or we would break every topology that uses generated 
repartition topic names already. So, either way, we have to cache something to 
tell us to do something different on the "first reuse" (i.e., the second use of 
the KStream).
   
   Since we have to do that anyway, maybe it's fine to just cache the 
repartition node itself instead of a flag that says "bump the index next time". 
   
   > leaking optimizations into the DSL
   
   I'm on the fence about whether this is an "optimization" or "reasonable 
behavior". It sort of feels like the latter, and the only reason we needed to 
introduce the "repartition-collapsing" optimization is that we failed to 
introduce reasonable behavior from the beginning. Also, my read is that the DSL 
builder and the optimizer are not cleanly separated right now anyway, and if we 
ever want to build more optimizations, we'll most likely need to make another 
pass on both anyway. We're also starting to think about topology evolution (cc 
@cadonna ), which makes this a less scary prospect, as we can then implement a 
mechanism to _compatibly_ introduce new optimizations. In other words, I'm not 
taking a hard stance, but leaning in the direction of doing the more efficient 
thing than the more pure thing, since we're not currently super pure anyway.
   
   > Other repartition topics
   
   I think we'd better leave it alone for now, implement topology evolution, 
then migrate to a completely pure and consistent approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org