[ 
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederic Tardif updated KAFKA-7397:
-----------------------------------
    Description: 
When consuming a globalKTable (with the expectation of caching all the data of 
a topic in a consumer store), we can't apply any stateless transformation 
(filter, map), prior to materializing. To achieve this, while ensuring to 
consume the records of all the partitions, we must first run a stream app that 
pre-process the ingress topic into an exact K,V egress topic as we want to 
store in our GlobalKTable. This looks unnecessarily complex, and causes to 
double the storage of the topic, while the only goal is to adapt statelessly 
the data prior to storing (rockDB) at the receiving end.

See discussion on 
:[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]

As a workaround, I have used `new Builder().addGlobalStore(....)` with a Custom 
Processor able to filter and map prior to store (see attached). Although this 
seem to work, I believe this functionality should be part of the basic dsl api 
when working with a globalTable (`new 
StreamsBuilder().globalTable().filter(...).map()... `).

 

 

 

  was:
When consuming a globalKTable (with the expectation of caching all the data of 
a topic in a consumer store), we can't apply any stateless transformation 
(filter, map), prior to materializing. To achieve this, while ensure to consume 
the records of all the partitions, we must first run a stream app that 
pre-process the ingress topic into an exact K,V egress topic as we want to 
store in our GlobalKTable. This looks unnecessarily complex, and causes to 
double the storage of the topic, while the only goal is to adapt statelessly 
the data prior to storing (rockDB) at the receiving end.

See discussion on 
:https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic

As a workaround, I have used `new Builder().addGlobalStore(....)` with a Custom 
Processor able to filter and map prior to store (see attached). Although this 
seem to work, I believe this functionality should be part of the basic dsl api 
when working with a globalTable (`new 
StreamsBuilder().globalTable().filter(...).map()... `).

 

 

 


> Ability to apply DSL stateless transformation on a global table
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7397
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7397
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data 
> of a topic in a consumer store), we can't apply any stateless transformation 
> (filter, map), prior to materializing. To achieve this, while ensuring to 
> consume the records of all the partitions, we must first run a stream app 
> that pre-process the ingress topic into an exact K,V egress topic as we want 
> to store in our GlobalKTable. This looks unnecessarily complex, and causes to 
> double the storage of the topic, while the only goal is to adapt statelessly 
> the data prior to storing (rockDB) at the receiving end.
> See discussion on 
> :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
> As a workaround, I have used `new Builder().addGlobalStore(....)` with a 
> Custom Processor able to filter and map prior to store (see attached). 
> Although this seem to work, I believe this functionality should be part of 
> the basic dsl api when working with a globalTable (`new 
> StreamsBuilder().globalTable().filter(...).map()... `).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to