cadonna commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522027942



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties 
props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new 
HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does 
not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes 
it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be 
merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new 
TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));

Review comment:
       Just to be clear. This improves the situation but it is not a complete 
solution, right? Assume we have a topic `topicA`. Patterns `topic*` and `topi*` 
both  match `topicA` but they are different when compared with this comparator. 
In that case a `TopologyException` would be thrown in the 
`InternalTopologyBuilder`, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user 
tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = 
consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && 
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+            log.error("Tried to merge source nodes {} and {} which are 
subscribed to the same topic/pattern, but "
+                          + "the offset reset policies do not match", this, 
other);
+            throw new TopologyException("Can't configure different offset 
reset policies on the same input topic(s)");
+        }
+        for (final StreamsGraphNode otherChild : other.children()) {
+            // Move children from other to this, these calls take care of 
resetting the child's parents to this

Review comment:
       Do we really need this comment and the comment on line 81. We get the 
same information when we navigate to the call and to the implementation of the 
methods with the difference that comments can start to be outdated without us 
noticing it.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void 
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));

Review comment:
       Please use a collection with at least two topics to test the loop over 
the collections.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void 
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));
+        builder.build();
+    }
+
+    @Test
+    public void 
shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(asList("topic", "anotherTopic"));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void 
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic");
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void 
shouldThrowWhenSubscribedToAPatternWithDifferentResetPolicies() {
+        builder.stream(Pattern.compile("some-regex"), 
Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream(Pattern.compile("some-regex"), 
Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+

Review comment:
       Could you also add a test with two patterns with the same string but one 
with a set reset policy and one with unset reset policy like you did for the 
non-pattern case. Just to make it clear it should also throw in that case.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void 
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowSubscribingToSamePattern() {
+        builder.stream(Pattern.compile("some-regex"));
+        builder.stream(Pattern.compile("some-regex"));
+        builder.build();
+    }
+
+    @Test
+    public void shouldAllowReadingFromSameCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(Collections.singletonList("topic"));
+        builder.build();
+    }
+
+    @Test
+    public void 
shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() {
+        builder.stream(Collections.singletonList("topic"));
+        builder.stream(asList("topic", "anotherTopic"));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void shouldThrowWhenSubscribedToATopicWithDifferentResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic", Consumed.with(AutoOffsetReset.LATEST));
+        assertThrows(TopologyException.class, builder::build);
+    }
+
+    @Test
+    public void 
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
+        builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream("topic");
+        assertThrows(TopologyException.class, builder::build);
+    }

Review comment:
       What should happen in this case? See also my comment in `merge()`.
   ```
       @Test
       public void 
shouldThrowWhenSubscribedToATopicWithSetAndUnsetResetPolicies() {
           builder.stream("topic");
           builder.stream("topic", Consumed.with(AutoOffsetReset.EARLIEST));
           assertThrows(TopologyException.class, builder::build);
       }
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user 
tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = 
consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && 
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {

Review comment:
       What should happen if this reset policy is `null` and the other is not 
`null`? I guess we should also throw in that case, don't we?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##########
@@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties 
props) {
         internalTopologyBuilder.validateCopartition();
     }
 
+    private void mergeDuplicateSourceNodes() {
+        final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new 
HashMap<>();
+
+        // We don't really care about the order here, but since Pattern does 
not implement equals() we can't rely on
+        // a regular HashMap and containsKey(Pattern). But for our purposes 
it's sufficient to compare the compiled
+        // string and flags to determine if two pattern subscriptions can be 
merged into a single source node
+        final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes =
+            new 
TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags));
+
+        for (final StreamsGraphNode graphNode : root.children()) {
+            if (graphNode instanceof StreamSourceNode) {
+                final StreamSourceNode<?, ?> currentSourceNode = 
(StreamSourceNode<?, ?>) graphNode;

Review comment:
       We could avoid the `instanceof` and the casting if we introduce a 
`RootGraphNode` with a method `sourceNodes()`. Since a root can only have 
source nodes and state stores as children, we could make the topology code in 
general a bit more type safe. As far as I can see that would need some 
additional changes outside the scope of this PR. So, feel free to not consider 
this comment for this PR and we can do another PR for that.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -410,18 +410,6 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
             }
         }
 
-        for (final Pattern otherPattern : earliestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || 
otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will 
overlap with another pattern " + otherPattern + " already been registered by 
another source");
-            }
-        }
-
-        for (final Pattern otherPattern : latestResetPatterns) {
-            if (topicPattern.pattern().contains(otherPattern.pattern()) || 
otherPattern.pattern().contains(topicPattern.pattern())) {
-                throw new TopologyException("Pattern " + topicPattern + " will 
overlap with another pattern " + otherPattern + " already been registered by 
another source");
-            }
-        }
-

Review comment:
       I agree on the first part.
   Regarding the second part, I had similiar thoughts when I wrote my comment 
in `mergeDuplicateSourceNodes()`.
   But I might also be missing something here.  

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##########
@@ -895,6 +898,55 @@ public void 
shouldUseSpecifiedNameForAggregateOperationGivenTable() {
             STREAM_OPERATION_NAME);
     }
 
+    @Test
+    public void shouldAllowReadingFromSameTopic() {
+        builder.stream("topic");
+        builder.stream("topic");
+        builder.build();
+    }

Review comment:
       Could you please add a try-catch clause to better document the test?
   For example:
   ```suggestion
       public void shouldAllowReadingFromSameTopic() {
           builder.stream("topic");
           builder.stream("topic");
           
           try {
               builder.build();
           } catch (final TopologyException topologyException) {
               fail("TopologyException not expected");
           }
       }
   ```
   This applies also to the other tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to