[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158823#comment-17158823
 ] 

Matthias J. Sax commented on KAFKA-5488:
----------------------------------------

[~high.lee] Thanks for your input. The KIP is already accepted. If you have 
concerns, please raise them on the KIP discuss or vote thread (we want to keep 
the discussion in one place) – it might also help if you would re-read the 
discussion thread first, to get full context on why the API is designed in the 
current way.
{quote}I don't think there is a need to take a function as a parameter, That's 
enough for the consumer
{quote}
There was an extensive discussion on the dev mailing list about this topic. 
Please read up on the details there why it's designed that way (and of course 
feel free to follow up with questions).
{quote}Wouldn't it be better to unify the return type to BranchedKStream
{quote}
Not sure if I can follow. Again. Please raise you concerns/ideas on the dev 
mailing list.

Btw, why did you assign the ticket to you? I am sure that Ivan who proposed the 
KIP want to implement it. \cc [~iponomarev] (hope that is the right handle :) )

> KStream.branch should not return a Array of streams we have to access by 
> known index
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5488
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Marcel "childNo͡.de" Trautwein
>            Assignee: highluck
>            Priority: Major
>              Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
>     public static void main(String... args) {
>         KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             (k, v) -> EventType::validData
>             (k, v) -> true
>         );
>         
>         branchedStreams[0]
>         .to("topicValidData");
>         
>         branchedStreams[1]
>         .to("topicInvalidData");
>     }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, 
> Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
>         new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             Branch.create(
>                 (k, v) -> EventType::validData,
>                 stream -> stream.to("topicValidData")
>             ),
>             Branch.create(
>                 (k, v) -> true,
>                 stream -> stream.to("topicInvalidData")
>             )
>         );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to