[
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658391#comment-16658391
]
Paul Whalen commented on KAFKA-7523:
------------------------------------
Ah, I think I was a bit more confusing in my described solution than I needed
to be. The pattern of passing the {{Topology}} in is something I'd actually
like to avoid. I think that could be achieved with this interface:
{code:java}
public interface TransformerSupplierWithState<K, V, R> extends
TransformerSupplier<K, V, R> {
Transformer<K, V, R> get();
Map<String, StoreBuilder> stateStores();
}{code}
And then inside {{KStreamImpl}} we'd have a {{transform}} like this:
{code:java}
@Override
public <KR, VR> KStream<KR, VR> transform(final TransformerSupplierWithState<?
super K, ? super V, KeyValue<KR, VR>> transformerSupplier) {
Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be
null");
final String name = builder.newProcessorName(TRANSFORM_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new
StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new
KStreamTransform<>(transformerSupplier), name),
transformerSupplier.stateStores().keySet().toArray(new String[]{}),
true
);
transformNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, transformNode);
for (StoreBuilder storeBuilder :
transformerSupplier.stateStores().values()) {
builder.addStateStore(storeBuilder);
}
return new KStreamImpl<>(name, null, null, sourceNodes, true,
transformNode, builder);
}{code}
The only differences there from the existing {{transform}} are that instead of
getting the state store names from the varargs parameter, we get them from the
keys of the map returned by calling
{{TransformerSupplierWithState::stateStores}}, and additionally we iterate over
that app and add the {{StoreBuilder}}s to the {{InternalStreamsBuilder}}.
One subtlety with this design is that it might be confusing to developers when
the two methods are called, if they don't look at the implementation:
{{stateStores}} would be called immediately, at "topology-construction-time,"
while {{get}} is called at "task-creation-time." Personally, I've written some
bugs due to my misunderstanding of the lifecycle of wiring up low level
processors and state stores, so it's something to watch out for.
But I'll put those thoughts in the KIP which I'm currently writing up.
> TransformerSupplier/ProcessorSupplier enhancements
> --------------------------------------------------
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Paul Whalen
> Priority: Minor
> Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and
> {{Transformers}} that are stateful, often I want these processors to "own"
> one or more state stores, the details of which are not important to the
> business logic of the application. However, when incorporating these into
> the topologies defined by the high level API, using {{KStream::transform}} or
> {{KStream::process}}, I'm forced to specify the stores so the topology is
> wired up correctly. This creates an unfortunate pattern where the
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the
> pattern I've been following) holds the information about the name of the
> state stores, must be defined above the "high level" "fluent API"-style
> pipeline, which makes it hard to understand the business logic data flow.
>
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block",
> and pass the topology in so I can call {{topology.addStateStore()}} inside
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what
> the state store names are for that point in the topology. The lambda {{val ->
> businessLogic(val)}} is really what I want to see in-line because that's the
> crux of what is happening, along with the name of some factory method
> describing what the transformer is doing for me internally. This issue is
> obviously exacerbated when the "fluent block" is much longer than this
> example - It gets worse the farther away {{val -> businessLogic(val)}} is
> from {{KStream::transform}}.
>
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val ->
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState<K, V, R> {
> Transformer<K, V, R> get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of
> {{TransformerSupplierWithState}} could also handle the job of "adding" its
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState<K, V, R> {
> Transformer<K, V, R> get();
> Map<String, StoreBuilder> stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should
> not happen, or if there is a better forum for this suggestion (presumably it
> would require a KIP?). I'd be happy to build it as well if there is a chance
> of it being merged, it doesn't seem like a huge challenge to me.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)