[
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax reassigned KAFKA-5488:
--------------------------------------
Assignee: Ivan Ponomarev
> KStream.branch should not return a Array of streams we have to access by
> known index
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.2.1, 0.11.0.0
> Reporter: Marcel "childNo͡.de" Trautwein
> Assignee: Ivan Ponomarev
> Priority: Major
> Labels: needs-kip
>
> long story short: it's a mess to get a {{KStream<>[]}} out from
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces
> bad code which is not that good to maintain since you have to know the right
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
> .<byte[], EventType>stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
>
> branchedStreams[0]
> .to("topicValidData");
>
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>,
> Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .<byte[], EventType>stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)