[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadesh Adireddi reassigned KAFKA-5253:
----------------------------------------

    Assignee: Jagadesh Adireddi

> TopologyTestDriver must handle streams created with patterns
> ------------------------------------------------------------
>
>                 Key: KAFKA-5253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5253
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, unit tests
>    Affects Versions: 1.1.0
>            Reporter: Wim Van Leuven
>            Assignee: Jagadesh Adireddi
>            Priority: Major
>              Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
>     @Test
>     public void shouldProcessFromSourcesThatDoMatchThePattern() {
>         // -- setup stream pattern
>         final KStream<String, String> source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
>         source.to("topic-sink");
>         // -- setup processor to capture results
>         final MockProcessorSupplier<String, String> processorSupplier = new 
> MockProcessorSupplier<>();
>         source.process(processorSupplier);
>         // -- add source to stream data from
>         //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
>         // -- build test driver
>         driver = new KStreamTestDriver(builder);
>         driver.setTime(0L);
>         // -- test
>         driver.process("topic-source-3", "A", "aa");
>         // -- validate
>         // no exception was thrown
>         assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
>     }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to