[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787760#comment-16787760
 ] 

Patrik Kleindl commented on KAFKA-7663:
---------------------------------------

I took a short look at this issue while working on 
https://issues.apache.org/jira/browse/KAFKA-8037 because I added access to the 
sourceNode and it's deserializer there during 
GlobalStateManagerImpl.restoreState

If I try to call process in restoreState it fails because the topology has not 
been initialized yet.

This currently happens in GlobalStateUpdateTask.initialize but is only called 
after the restore has finished.

If I try to replicate this in GlobalStateManagerImpl.initialize by initializing 
before stateStore.init(processorContext, stateStore) then initializing the 
stateStore fails because it is not allowed after the topology has been 
initialized.

 

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to