[
https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
A. Sophie Blee-Goldman updated KAFKA-18026:
-------------------------------------------
Description:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
Implementation plan/wrapping procedure for DSL operators
* make sure processor is added via ProcessorParameters#addProcessorTo instead
of calling the InternalTopologyBuilder's #addProcessor and #addStateStore
methods directly
* convert stateful operators to implement the ProcessorSupplier#stores method,
rather than directly calling #addStateStore and/or
#connectProcessorAndStateStore
* update and/or add ProcessorWrapper tests to StreamsBuilderTest (some
existing tests may use processors that haven't been converted yet and are
expected to break/need fixing)
TODO list:
# -Non-source/sink PAPI processors:-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
-- [~ableegoldman]
# -ProcessorGraphNode (stateless DSL KStream operators):-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
[ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman]
# -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra]
# {-}KStream/TableAggregate (ie count, reduce, aggregate){-}:
[https://github.com/apache/kafka/pull/17929] -- [~ableegoldman]
# TableProcessorNode (stateless table operators eg KTable#filter): – rohan
# StreamToTableNode: – [~agavra]
# -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] –
[~ableegoldman]
# StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] –
[~ableegoldman]
# -KTableKTableJoinNode:- [https://github.com/apache/kafka/pull/18048] -–
[~agavra]
# StatefulProcessorNode:
## TableSuppressNode: – [~agavra]
## ForeignTableJoinNode: – [~ableegoldman]
## TODO: fill in other children/users of StatefulProcessorNode
Follow up:
* convert source & sink nodes to using ProcessorSupplier and wrap them too
* clean up StoreFactory<->StoreBuilder wrapping and configuration
* future-proof the wrapping mechanism:
** ensure new processor implementations get wrapped, eg by protecting the
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
** protect #addStateStore from being called out-of-band to prevent new state
stores (whether from new DSL operators or modifications to existing ones) from
being added to processors without being returned by the
ProcessorSupplier#stores method
* consider deprecating older alternative to ProcessorSupplier#stores
** cons: using lambdas for processor suppliers is very nice
docs: [https://github.com/apache/kafka/pull/17906]
was:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
Implementation plan/wrapping procedure for DSL operators
* make sure processor is added via ProcessorParameters#addProcessorTo instead
of calling the InternalTopologyBuilder's #addProcessor and #addStateStore
methods directly
* convert stateful operators to implement the ProcessorSupplier#stores method,
rather than directly calling #addStateStore and/or
#connectProcessorAndStateStore
* update and/or add ProcessorWrapper tests to StreamsBuilderTest (some
existing tests may use processors that haven't been converted yet and are
expected to break/need fixing)
TODO list:
# -Non-source/sink PAPI processors:-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
-- [~ableegoldman]
# -ProcessorGraphNode (stateless DSL KStream operators):-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
[ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman]
# -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra]
# {-}KStream/TableAggregate (ie count, reduce, aggregate){-}:
[https://github.com/apache/kafka/pull/17929] -- [~ableegoldman]
# TableProcessorNode (stateless table operators eg KTable#filter): – rohan
# StreamToTableNode: – [~agavra]
# -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] –
[~ableegoldman]
# StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] –
[~ableegoldman]
# KTableKTableJoinNode: [https://github.com/apache/kafka/pull/18048] -–
[~agavra]
# StatefulProcessorNode:
## TableSuppressNode: – [~agavra]
## ForeignTableJoinNode: – [~ableegoldman]
## TODO: fill in other children/users of StatefulProcessorNode
Follow up:
* convert source & sink nodes to using ProcessorSupplier and wrap them too
* clean up StoreFactory<->StoreBuilder wrapping and configuration
* future-proof the wrapping mechanism:
** ensure new processor implementations get wrapped, eg by protecting the
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
** protect #addStateStore from being called out-of-band to prevent new state
stores (whether from new DSL operators or modifications to existing ones) from
being added to processors without being returned by the
ProcessorSupplier#stores method
* consider deprecating older alternative to ProcessorSupplier#stores
** cons: using lambdas for processor suppliers is very nice
docs: [https://github.com/apache/kafka/pull/17906]
> Allow custom processor wrapping
> -------------------------------
>
> Key: KAFKA-18026
> URL: https://issues.apache.org/jira/browse/KAFKA-18026
> Project: Kafka
> Issue Type: New Feature
> Components: kip, streams
> Reporter: A. Sophie Blee-Goldman
> Assignee: A. Sophie Blee-Goldman
> Priority: Major
> Labels: kip
> Fix For: 4.0.0
>
>
> See KIP-1112:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
> Implementation plan/wrapping procedure for DSL operators
> * make sure processor is added via ProcessorParameters#addProcessorTo
> instead of calling the InternalTopologyBuilder's #addProcessor and
> #addStateStore methods directly
> * convert stateful operators to implement the ProcessorSupplier#stores
> method, rather than directly calling #addStateStore and/or
> #connectProcessorAndStateStore
> * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some
> existing tests may use processors that haven't been converted yet and are
> expected to break/need fixing)
> TODO list:
> # -Non-source/sink PAPI processors:-
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
> -- [~ableegoldman]
> # -ProcessorGraphNode (stateless DSL KStream operators):-
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
> [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman]
> # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra]
> # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}:
> [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman]
> # TableProcessorNode (stateless table operators eg KTable#filter): – rohan
> # StreamToTableNode: – [~agavra]
> # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] –
> [~ableegoldman]
> # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] –
> [~ableegoldman]
> # -KTableKTableJoinNode:- [https://github.com/apache/kafka/pull/18048] -–
> [~agavra]
> # StatefulProcessorNode:
> ## TableSuppressNode: – [~agavra]
> ## ForeignTableJoinNode: – [~ableegoldman]
> ## TODO: fill in other children/users of StatefulProcessorNode
> Follow up:
> * convert source & sink nodes to using ProcessorSupplier and wrap them too
> * clean up StoreFactory<->StoreBuilder wrapping and configuration
> * future-proof the wrapping mechanism:
> ** ensure new processor implementations get wrapped, eg by protecting the
> InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
> ** protect #addStateStore from being called out-of-band to prevent new state
> stores (whether from new DSL operators or modifications to existing ones)
> from being added to processors without being returned by the
> ProcessorSupplier#stores method
> * consider deprecating older alternative to ProcessorSupplier#stores
> ** cons: using lambdas for processor suppliers is very nice
> docs: [https://github.com/apache/kafka/pull/17906]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)