[
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610610#comment-16610610
]
Frederic Tardif commented on KAFKA-7397:
----------------------------------------
in attached zip: see integration test in :
kafka/kafka-utils/src/it/java/com/bell/cts/commons/kafka/store/customstore/CustomStoreRepository.java
and custom store classes in :
kafka/kafka-utils/src/main/java/com/bell/cts/commons/kafka/store/custom
> Ability to apply DSL stateless transformation on a global table
> ---------------------------------------------------------------
>
> Key: KAFKA-7397
> URL: https://issues.apache.org/jira/browse/KAFKA-7397
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Frederic Tardif
> Priority: Major
> Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data
> of a topic in a consumer store), we can't apply any stateless transformation
> (filter, map), prior to materializing. To achieve this, while ensure to
> consume the records of all the partitions, we must first run a stream app
> that pre-process the ingress topic into an exact K,V egress topic as we want
> to store in our GlobalKTable. This looks unnecessarily complex, and causes to
> double the storage of the topic, while the only goal is to adapt statelessly
> the data prior to storing (rockDB) at the receiving end.
> See discussion on
> :https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic
> As a workaround, I have used `new Builder().addGlobalStore(....)` with a
> Custom Processor able to filter and map prior to store (see attached).
> Although this seem to work, I believe this functionality should be part of
> the basic dsl api when working with a globalTable (`new
> StreamsBuilder().globalTable().filter(...).map()... `).
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)