[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445704#comment-16445704 ] ASF GitHub Bot commented on KAFKA-5253: --- mjsax closed pull request #4793: KAFKA-5253: Fixed KStreamTestDriver to handle streams created with patterns URL: https://github.com/apache/kafka/pull/4793 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 535f03524c0..b1d60a9ad88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1880,4 +1880,8 @@ public void updateSubscribedTopics(final Set topics, final String logPre subscriptionUpdates.updateTopics(topics); updateSubscriptions(subscriptionUpdates, logPrefix); } + +public synchronized Set getSourceTopicNames() { +return sourceTopicNames; +} } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 2009806fd18..ef65bb32b36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -53,6 +53,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -542,4 +543,49 @@ public void shouldMergeMultipleStreams() { assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"), processorSupplier.processed); } + +@Test +public void shouldProcessFromSourceThatMatchPattern() { +final KStreampattern2Source = builder.stream(Pattern.compile("topic-\\d")); + +final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); +pattern2Source.process(processorSupplier); + +driver.setUp(builder); +driver.setTime(0L); + +driver.process("topic-3", "A", "aa"); +driver.process("topic-4", "B", "bb"); +driver.process("topic-5", "C", "cc"); +driver.process("topic-6", "D", "dd"); +driver.process("topic-7", "E", "ee"); + +assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), +processorSupplier.processed); +} + +@Test +public void shouldProcessFromSourcesThatMatchMultiplePattern() { +final String topic3 = "topic-without-pattern"; + +final KStream pattern2Source1 = builder.stream(Pattern.compile("topic-\\d")); +final KStream pattern2Source2 = builder.stream(Pattern.compile("topic-[A-Z]")); +final KStream source3 = builder.stream(topic3); +final KStream merged = pattern2Source1.merge(pattern2Source2).merge(source3); + +final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); +merged.process(processorSupplier); + +driver.setUp(builder); +driver.setTime(0L); + +driver.process("topic-3", "A", "aa"); +driver.process("topic-4", "B", "bb"); +driver.process("topic-A", "C", "cc"); +driver.process("topic-Z", "D", "dd"); +driver.process(topic3, "E", "ee"); + +assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), +processorSupplier.processed); +} } diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index aebb84975e2..eb137dbabeb 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; public class KStreamTestDriver extends ExternalResource { @@ -184,9 +185,15 @@ public void process(final String topicName, final Object key, final Object value private ProcessorNode sourceNodeByTopicName(final String topicName) { ProcessorNode topicNode = topology.source(topicName); - -if
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427284#comment-16427284 ] Jagadesh Adireddi commented on KAFKA-5253: -- Hi [~mjsax], Made changes to `{{TopologyTestDriver`}} class for supporting patterns. Kindly review and let me know if any changes needed. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > **Note: the example below is outdate as it used the old KStreamTestDriver. > The overall test layout can be adopted for TopologyTestDriver though, thus, > we just leave it in the description.** > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); // this should be > TopologyTestDriver > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change.- -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426334#comment-16426334 ] Matthias J. Sax commented on KAFKA-5253: As commented on the PR, this ticket addressed `TopololgyTestDriver` but not `KStreamTestDriver` (KStreamTestDriver will be removed via KAFKA-6474) > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > **Note: the example below is outdate as it used the old KStreamTestDriver. > The overall test layout can be adopted for TopologyTestDriver though, thus, > we just leave it in the description.** > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); // this should be > TopologyTestDriver > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change.- -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418503#comment-16418503 ] ASF GitHub Bot commented on KAFKA-5253: --- jadireddi opened a new pull request #4793: KAFKA-5253: Fixed KStreamTestDriver to handle streams created with patterns URL: https://github.com/apache/kafka/pull/4793 https://issues.apache.org/jira/browse/KAFKA-5253 Fixed `KStreamTestDriver#sourceNodeByTopicName` to handle streams created with patterns. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418111#comment-16418111 ] Matthias J. Sax commented on KAFKA-5253: Thanks for working on this. If the fix does not require a public API changes, we don't need a KIP -- when I created the KIP, I was assuming that we will need a public API for the fix. Please go ahead an open the PR and we can take it from there. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417896#comment-16417896 ] Jagadesh Adireddi commented on KAFKA-5253: -- Hi [~mjsax], I am new to Kafka contribution. I made code changes for above issue and it's working as expected. I have few questions before I submit pull request. I made changes inside *private* method *KStreamTestDriver*#*sourceNodeByTopicName .* I haven't modified any method signature or so. Just embedded below code {code:java} Set sourceTopics = topology.sourceTopics(); for (final String eachSourceTopic : sourceTopics) { if (Pattern.compile(eachSourceTopic).matcher(topicName).matches()) { return topology.source(eachSourceTopic); } } {code} Do i still need to submit KIP for this change, as i am not touching any public methods. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Assignee: Jagadesh Adireddi >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns
[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354332#comment-16354332 ] Matthias J. Sax commented on KAFKA-5253: I doubled checked {{TopologyTestDriver}} and it is not able to handle Pattern subscription. The fix should be fairly easy, however, required a public API change and thus a KIP. > TopologyTestDriver must handle streams created with patterns > > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Wim Van Leuven >Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver- > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStreamsource = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)