cadonna commented on a change in pull request #9582: URL: https://github.com/apache/kafka/pull/9582#discussion_r522027942
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ########## @@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) { internalTopologyBuilder.validateCopartition(); } + private void mergeDuplicateSourceNodes() { + final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>(); + + // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on + // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled + // string and flags to determine if two pattern subscriptions can be merged into a single source node + final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes = + new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags)); Review comment: Just to be clear. This improves the situation but it is not a complete solution, right? Assume we have a topic `topicA`. Patterns `topic*` and `topi*` both match `topicA` but they are different when compared with this comparator. In that case a `TopologyException` would be thrown in the `InternalTopologyBuilder`, right? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java ########## @@ -71,6 +78,21 @@ public Pattern topicPattern() { return consumedInternal.valueSerde(); } + // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times + public void merge(final StreamSourceNode<?, ?> other) { + final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy(); + if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) { + log.error("Tried to merge source nodes {} and {} which are subscribed to the same topic/pattern, but " + + "the offset reset policies do not match", this, other); + throw new TopologyException("Can't configure different offset reset policies on the same input topic(s)"); + } + for (final StreamsGraphNode otherChild : other.children()) { + // Move children from other to this, these calls take care of resetting the child's parents to this Review comment: Do we really need this comment and the comment on line 81. We get the same information when we navigate to the call and to the implementation of the methods with the difference that comments can start to be outdated without us noticing it. ########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() { STREAM_OPERATION_NAME); } + @Test + public void shouldAllowReadingFromSameTopic() { + builder.stream("topic"); + builder.stream("topic"); + builder.build(); + } + + @Test + public void shouldAllowSubscribingToSamePattern() { + builder.stream(Pattern.compile("some-regex")); + builder.stream(Pattern.compile("some-regex")); + builder.build(); + } + + @Test + public void shouldAllowReadingFromSameCollectionOfTopics() { + builder.stream(Collections.singletonList("topic")); + builder.stream(Collections.singletonList("topic")); Review comment: Please use a collection with at least two topics to test the loop over the collections. ########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() { STREAM_OPERATION_NAME); } + @Test + public void shouldAllowReadingFromSameTopic() { + builder.stream("topic"); + builder.stream("topic"); + builder.build(); + } + + @Test + public void shouldAllowSubscribingToSamePattern() { + builder.stream(Pattern.compile("some-regex")); + builder.stream(Pattern.compile("some-regex")); + builder.build(); + } + + @Test + public void shouldAllowReadingFromSameCollectionOfTopics() { + builder.stream(Collections.singletonList("topic")); + builder.stream(Collections.singletonList("topic")); + builder.build(); + } + + @Test + public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() { + builder.stream(Collections.singletonList("topic")); + builder.stream(asList("topic", "anotherTopic")); + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() { + builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST)); + builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST)); + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() { + builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST)); + builder.stream("topic"); + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldThrowWhenSubscribedToAPatternWithDifferentResetPolicies() { + builder.stream(Pattern.compile("some-regex"), Consumed.with(AutoOffsetReset.EARLIEST)); + builder.stream(Pattern.compile("some-regex"), Consumed.with(AutoOffsetReset.LATEST)); + assertThrows(TopologyException.class, builder::build); + } + Review comment: Could you also add a test with two patterns with the same string but one with a set reset policy and one with unset reset policy like you did for the non-pattern case. Just to make it clear it should also throw in that case. ########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() { STREAM_OPERATION_NAME); } + @Test + public void shouldAllowReadingFromSameTopic() { + builder.stream("topic"); + builder.stream("topic"); + builder.build(); + } + + @Test + public void shouldAllowSubscribingToSamePattern() { + builder.stream(Pattern.compile("some-regex")); + builder.stream(Pattern.compile("some-regex")); + builder.build(); + } + + @Test + public void shouldAllowReadingFromSameCollectionOfTopics() { + builder.stream(Collections.singletonList("topic")); + builder.stream(Collections.singletonList("topic")); + builder.build(); + } + + @Test + public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() { + builder.stream(Collections.singletonList("topic")); + builder.stream(asList("topic", "anotherTopic")); + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() { + builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST)); + builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST)); + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() { + builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST)); + builder.stream("topic"); + assertThrows(TopologyException.class, builder::build); + } Review comment: What should happen in this case? See also my comment in `merge()`. ``` @Test public void shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() { builder.stream("topic"); builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST)); assertThrows(TopologyException.class, builder::build); } ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java ########## @@ -71,6 +78,21 @@ public Pattern topicPattern() { return consumedInternal.valueSerde(); } + // We "merge" source nodes into a single node under the hood if a user tries to read in a source topic multiple times + public void merge(final StreamSourceNode<?, ?> other) { + final AutoOffsetReset resetPolicy = consumedInternal.offsetResetPolicy(); + if (resetPolicy != null && !resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) { Review comment: What should happen if this reset policy is `null` and the other is not `null`? I guess we should also throw in that case, don't we? ########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ########## @@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) { internalTopologyBuilder.validateCopartition(); } + private void mergeDuplicateSourceNodes() { + final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>(); + + // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on + // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled + // string and flags to determine if two pattern subscriptions can be merged into a single source node + final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes = + new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags)); + + for (final StreamsGraphNode graphNode : root.children()) { + if (graphNode instanceof StreamSourceNode) { + final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode; Review comment: We could avoid the `instanceof` and the casting if we introduce a `RootGraphNode` with a method `sourceNodes()`. Since a root can only have source nodes and state stores as children, we could make the topology code in general a bit more type safe. As far as I can see that would need some additional changes outside the scope of this PR. So, feel free to not consider this comment for this PR and we can do another PR for that. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ########## @@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, } } - for (final Pattern otherPattern : earliestResetPatterns) { - if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) { - throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source"); - } - } - - for (final Pattern otherPattern : latestResetPatterns) { - if (topicPattern.pattern().contains(otherPattern.pattern()) || otherPattern.pattern().contains(topicPattern.pattern())) { - throw new TopologyException("Pattern " + topicPattern + " will overlap with another pattern " + otherPattern + " already been registered by another source"); - } - } - Review comment: I agree on the first part. Regarding the second part, I had similiar thoughts when I wrote my comment in `mergeDuplicateSourceNodes()`. But I might also be missing something here. ########## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ########## @@ -895,6 +898,55 @@ public void shouldUseSpecifiedNameForAggregateOperationGivenTable() { STREAM_OPERATION_NAME); } + @Test + public void shouldAllowReadingFromSameTopic() { + builder.stream("topic"); + builder.stream("topic"); + builder.build(); + } Review comment: Could you please add a try-catch clause to better document the test? For example: ```suggestion public void shouldAllowReadingFromSameTopic() { builder.stream("topic"); builder.stream("topic"); try { builder.build(); } catch (final TopologyException topologyException) { fail("TopologyException not expected"); } } ``` This applies also to the other tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org