[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158823#comment-17158823 ]
Matthias J. Sax commented on KAFKA-5488: ---------------------------------------- [~high.lee] Thanks for your input. The KIP is already accepted. If you have concerns, please raise them on the KIP discuss or vote thread (we want to keep the discussion in one place) – it might also help if you would re-read the discussion thread first, to get full context on why the API is designed in the current way. {quote}I don't think there is a need to take a function as a parameter, That's enough for the consumer {quote} There was an extensive discussion on the dev mailing list about this topic. Please read up on the details there why it's designed that way (and of course feel free to follow up with questions). {quote}Wouldn't it be better to unify the return type to BranchedKStream {quote} Not sure if I can follow. Again. Please raise you concerns/ideas on the dev mailing list. Btw, why did you assign the ticket to you? I am sure that Ivan who proposed the KIP want to implement it. \cc [~iponomarev] (hope that is the right handle :) ) > KStream.branch should not return a Array of streams we have to access by > known index > ------------------------------------------------------------------------------------ > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Marcel "childNo͡.de" Trautwein > Assignee: highluck > Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, > Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)