[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-7523: -------------------------------------- Assignee: Paul Whalen > TransformerSupplier/ProcessorSupplier enhancements > -------------------------------------------------- > > Key: KAFKA-7523 > URL: https://issues.apache.org/jira/browse/KAFKA-7523 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Paul Whalen > Assignee: Paul Whalen > Priority: Minor > Labels: needs-kip > > I have found that when writing "low level" {{Processors}} and > {{Transformers}} that are stateful, often I want these processors to "own" > one or more state stores, the details of which are not important to the > business logic of the application. However, when incorporating these into > the topologies defined by the high level API, using {{KStream::transform}} or > {{KStream::process}}, I'm forced to specify the stores so the topology is > wired up correctly. This creates an unfortunate pattern where the > {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the > pattern I've been following) holds the information about the name of the > state stores, must be defined above the "high level" "fluent API"-style > pipeline, which makes it hard to understand the business logic data flow. > > What I currently have to do: > {code:java} > TransformerSupplier transformerSupplier = new > TransformerSupplierWithState(topology, val -> businessLogic(val)); > builder.stream("in.topic") > .transform(transformerSupplier, transformerSupplier.stateStoreNames()) > .to("out.topic");{code} > I have to both define the {{TransformerSupplier}} above the "fluent block", > and pass the topology in so I can call {{topology.addStateStore()}} inside > the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what > the state store names are for that point in the topology. The lambda {{val -> > businessLogic(val)}} is really what I want to see in-line because that's the > crux of what is happening, along with the name of some factory method > describing what the transformer is doing for me internally. This issue is > obviously exacerbated when the "fluent block" is much longer than this > example - It gets worse the farther away {{val -> businessLogic(val)}} is > from {{KStream::transform}}. > > An improvement: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(topology, val -> > businessLogic(val))) > .to("out.topic");{code} > Which implies the existence of a {{KStream::transform}} that takes a single > argument that adheres to this interface: > {code:java} > interface TransformerSupplierWithState<K, V, R> { > Transformer<K, V, R> get(); > String[] stateStoreNames(); > }{code} > Or better yet, I wouldn't have to pass in the topology, the caller of > {{TransformerSupplierWithState}} could also handle the job of "adding" its > state stores to the topology: > {code:java} > interface TransformerSupplierWithState<K, V, R> { > Transformer<K, V, R> get(); > Map<String, StoreBuilder> stateStores(); > }{code} > Which would enable my ideal: > {code:java} > builder.stream("in.topic") > .transform(transformerSupplierWithState(val -> businessLogic(val))) > .to("out.topic");{code} > I think this would be a huge improvement in the usability of low-level > processors with the high-level DSL. > Please let me know if I'm missing something as to why this cannot or should > not happen, or if there is a better forum for this suggestion (presumably it > would require a KIP?). I'd be happy to build it as well if there is a chance > of it being merged, it doesn't seem like a huge challenge to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)