[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445704#comment-16445704
 ] 

ASF GitHub Bot commented on KAFKA-5253:
---

mjsax closed pull request #4793: KAFKA-5253: Fixed KStreamTestDriver to handle 
streams created with patterns
URL: https://github.com/apache/kafka/pull/4793
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 535f03524c0..b1d60a9ad88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1880,4 +1880,8 @@ public void updateSubscribedTopics(final Set 
topics, final String logPre
 subscriptionUpdates.updateTopics(topics);
 updateSubscriptions(subscriptionUpdates, logPrefix);
 }
+
+public synchronized Set getSourceTopicNames() {
+return sourceTopicNames;
+}
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 2009806fd18..ef65bb32b36 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -53,6 +53,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
@@ -542,4 +543,49 @@ public void shouldMergeMultipleStreams() {
 assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", 
"F:ff", "G:gg", "H:hh"),
  processorSupplier.processed);
 }
+
+@Test
+public void shouldProcessFromSourceThatMatchPattern() {
+final KStream pattern2Source = 
builder.stream(Pattern.compile("topic-\\d"));
+
+final MockProcessorSupplier processorSupplier = new 
MockProcessorSupplier<>();
+pattern2Source.process(processorSupplier);
+
+driver.setUp(builder);
+driver.setTime(0L);
+
+driver.process("topic-3", "A", "aa");
+driver.process("topic-4", "B", "bb");
+driver.process("topic-5", "C", "cc");
+driver.process("topic-6", "D", "dd");
+driver.process("topic-7", "E", "ee");
+
+assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+processorSupplier.processed);
+}
+
+@Test
+public void shouldProcessFromSourcesThatMatchMultiplePattern() {
+final String topic3 = "topic-without-pattern";
+
+final KStream pattern2Source1 = 
builder.stream(Pattern.compile("topic-\\d"));
+final KStream pattern2Source2 = 
builder.stream(Pattern.compile("topic-[A-Z]"));
+final KStream source3 = builder.stream(topic3);
+final KStream merged = 
pattern2Source1.merge(pattern2Source2).merge(source3);
+
+final MockProcessorSupplier processorSupplier = new 
MockProcessorSupplier<>();
+merged.process(processorSupplier);
+
+driver.setUp(builder);
+driver.setTime(0L);
+
+driver.process("topic-3", "A", "aa");
+driver.process("topic-4", "B", "bb");
+driver.process("topic-A", "C", "cc");
+driver.process("topic-Z", "D", "dd");
+driver.process(topic3, "E", "ee");
+
+assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"),
+processorSupplier.processed);
+}
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index aebb84975e2..eb137dbabeb 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -42,6 +42,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 public class KStreamTestDriver extends ExternalResource {
 
@@ -184,9 +185,15 @@ public void process(final String topicName, final Object 
key, final Object value
 
 private ProcessorNode sourceNodeByTopicName(final String topicName) {
 ProcessorNode topicNode = topology.source(topicName);
-
-if 

[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-04-05 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16427284#comment-16427284
 ] 

Jagadesh Adireddi commented on KAFKA-5253:
--

Hi [~mjsax],
Made changes to `{{TopologyTestDriver`}} class for supporting patterns. Kindly 
review and let me know if any changes needed.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> **Note: the example below is outdate as it used the old KStreamTestDriver. 
> The overall test layout can be adopted for TopologyTestDriver though, thus, 
> we just leave it in the description.**
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder); // this should be 
> TopologyTestDriver
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.-



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-04-04 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426334#comment-16426334
 ] 

Matthias J. Sax commented on KAFKA-5253:


As commented on the PR, this ticket addressed `TopololgyTestDriver` but not 
`KStreamTestDriver` (KStreamTestDriver will be removed via KAFKA-6474)

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> **Note: the example below is outdate as it used the old KStreamTestDriver. 
> The overall test layout can be adopted for TopologyTestDriver though, thus, 
> we just leave it in the description.**
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder); // this should be 
> TopologyTestDriver
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.-



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418503#comment-16418503
 ] 

ASF GitHub Bot commented on KAFKA-5253:
---

jadireddi opened a new pull request #4793: KAFKA-5253: Fixed KStreamTestDriver 
to handle streams created with patterns
URL: https://github.com/apache/kafka/pull/4793
 
 
   https://issues.apache.org/jira/browse/KAFKA-5253
   
   Fixed `KStreamTestDriver#sourceNodeByTopicName` to handle streams created 
with patterns.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418111#comment-16418111
 ] 

Matthias J. Sax commented on KAFKA-5253:


Thanks for working on this. If the fix does not require a public API changes, 
we don't need a KIP -- when I created the KIP, I was assuming that we will need 
a public API for the fix. Please go ahead an open the PR and we can take it 
from there.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-28 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417896#comment-16417896
 ] 

Jagadesh Adireddi commented on KAFKA-5253:
--

Hi [~mjsax],


I am new to Kafka contribution. I made code changes for above issue and it's 
working as expected.


I have few questions before I submit pull request. I made changes inside 
*private* method 

*KStreamTestDriver*#*sourceNodeByTopicName .* I haven't modified any method 
signature or so. Just embedded below code 
 
{code:java}
 Set sourceTopics = topology.sourceTopics();
for (final String eachSourceTopic : sourceTopics) {
if (Pattern.compile(eachSourceTopic).matcher(topicName).matches()) {
return topology.source(eachSourceTopic);
}
}
{code}
Do i still need to submit KIP for this change, as i am not touching any public 
methods.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-02-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354332#comment-16354332
 ] 

Matthias J. Sax commented on KAFKA-5253:


I doubled checked {{TopologyTestDriver}} and it is not able to handle Pattern 
subscription. The fix should be fairly easy, however, required a public API 
change and thus a KIP.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)