ableegoldman commented on a change in pull request #9582:
URL: https://github.com/apache/kafka/pull/9582#discussion_r522381803



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
##########
@@ -71,6 +78,21 @@ public Pattern topicPattern() {
         return consumedInternal.valueSerde();
     }
 
+    // We "merge" source nodes into a single node under the hood if a user 
tries to read in a source topic multiple times
+    public void merge(final StreamSourceNode<?, ?> other) {
+        final AutoOffsetReset resetPolicy = 
consumedInternal.offsetResetPolicy();
+        if (resetPolicy != null && 
!resetPolicy.equals(other.consumedInternal().offsetResetPolicy())) {
+            log.error("Tried to merge source nodes {} and {} which are 
subscribed to the same topic/pattern, but "
+                          + "the offset reset policies do not match", this, 
other);
+            throw new TopologyException("Can't configure different offset 
reset policies on the same input topic(s)");
+        }
+        for (final StreamsGraphNode otherChild : other.children()) {
+            // Move children from other to this, these calls take care of 
resetting the child's parents to this

Review comment:
       I'll remove it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to