[
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-7523:
-----------------------------------
Labels: needs-kip (was: )
> TransformerSupplier/ProcessorSupplier enhancements
> --------------------------------------------------
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Paul Whalen
> Priority: Minor
> Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and
> {{Transformers}} that are stateful, often I want these processors to "own"
> one or more state stores, the details of which are not important to the
> business logic of the application. However, when incorporating these into
> the topologies defined by the high level API, using {{KStream::transform}} or
> {{KStream::process}}, I'm forced to specify the stores so the topology is
> wired up correctly. This creates an unfortunate pattern where the
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the
> pattern I've been following) holds the information about the name of the
> state stores, must be defined above the "high level" "fluent API"-style
> pipeline, which makes it hard to understand the business logic data flow.
>
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block",
> and pass the topology in so I can call {{topology.addStateStore()}} inside
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what
> the state store names are for that point in the topology. The lambda {{val ->
> businessLogic(val)}} is really what I want to see in-line because that's the
> crux of what is happening, along with the name of some factory method
> describing what the transformer is doing for me internally. This issue is
> obviously exacerbated when the "fluent block" is much longer than this
> example - It gets worse the farther away {{val -> businessLogic(val)}} is
> from {{KStream::transform}}.
>
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val ->
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState<K, V, R> {
> Transformer<K, V, R> get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of
> {{TransformerSupplierWithState}} could also handle the job of "adding" its
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState<K, V, R> {
> Transformer<K, V, R> get();
> Map<String, StoreBuilder> stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should
> not happen, or if there is a better forum for this suggestion (presumably it
> would require a KIP?). I'd be happy to build it as well if there is a chance
> of it being merged, it doesn't seem like a huge challenge to me.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)