Hi Bruno,

I just had a discussion with a colleague of mine regarding this and I wanted to 
give you a quick contextual update. With regards to the global state, I realize 
that having this state consistent in a distributed system is very difficult. My 
expectation was that since it is a global state, Kafka takes care of the 
consistency and I can just access the data. I think my expectation was a bit 
naïve. The state will probably be eventually consistent. But this does not fit 
with what I am trying to do. As you said I should use a local store.

With regards to the question in my previous mail with the amount of partitions. 
I think I have answered my own question. Ensuring that the messages have the 
correct and consistent keys will see to it that all the data for a specific key 
ends up in a single partition. It does not mean that a partition per key is 
required (which I first thought).

Thanks again for your help!

Mit freundlichen Grüßen / Best regards 

Georg Schmidt-Dumont
BCI/ESW17
Bosch Connected Industry

Tel. +49 711 811-49893 

► Take a look: https://bgn.bosch.com/alias/bci



-----Ursprüngliche Nachricht-----
Von: Bruno Cadonna <br...@confluent.io> 
Gesendet: Dienstag, 19. Mai 2020 11:42
An: Users <users@kafka.apache.org>
Betreff: Re: Question regarding Kafka Streams Global State Store

Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by default. So, 
if the instance crashes the local state store is restored from the local state 
directory. If the local state directory is empty or does not exist the local 
state store is restored from the Kafka topic. Local state stores are as 
resilient as global state stores.

As far as I understand, you only look up previous records with the same key. 
You do not need to have the global state available at each instance to do this. 
Having available all records with the same key is sufficient. If your input 
topic are partitioned by key then records with the same key will land on the 
same instance. That means, your local state store contains all records with the 
same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) 
<georg.schmidt-dum...@de.bosch.com.invalid> wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application 
> crashes, the store is populated properly once the reason for the crash has 
> been fixed and the app starts again, i.e. I feel that it gives me a certain 
> resiliency. Second we will be running multiple instances of the application 
> and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to 
> suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -----Ursprüngliche Nachricht-----
> Von: Bruno Cadonna <br...@confluent.io>
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users <users@kafka.apache.org>
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state 
> instead of a local one. Are there any specific reasons for that? With a local 
> state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
> <georg.schmidt-dum...@de.bosch.com.invalid> wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The 
> > incoming messages are validated and transformed. The transformed messages 
> > are then published to a global state store via topic A as well as to an 
> > additional topic A for consumption by other applications further down the 
> > processing pipeline.
> >
> > As part of the transformation I access the global state store in order to 
> > get the values from the previous message and use them in the transformation 
> > of the current message. The messages only contain changed values and these 
> > changes are merged with the complete data set before being sent on, hence I 
> > always hold the latest state in the global store in order to merge it with 
> > the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get 
> > the latest state. The update of the store takes too long so when I access 
> > it in the transformation I either get no values or values which do not 
> > represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier = 
> > Stores.persistentKeyValueStore( “global-store” ); final 
> > StoreBuilder<KeyValueStore<String, JSONObject>> storeBuilder = 
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder, 
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore<String, JSONObject> stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >    stateStore = (KeyValueStore<String, JSONObject>) 
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >    log.info( "Update state store for {}: {}.", key, state );
> >    lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream<String, JSONObject> stream = builder.stream( 
> > “input-topic”, Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >                    .transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( 
> > jsonObjectSerde ) );
> >
> > stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
> >
> > //global state store access in ValueTransformer
> >
> > JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
> >                                    .orElse( new JSONObject() );
> >
> >
> > I have set the acknowledge property for the producers to “all”.
> >
> > I have tried to disable the caching by setting “cache.max.bytes.buffering” 
> > to 0 and by disabling the cache on the store using 
> > “.withCachingDisabled()”. I also tried setting the commit interval to 0. 
> > All without success.
> >
> > How can I setup a global state which meets the requirements as describe in 
> > the scenario above?
> >
> > Thank you!
> >
> > Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> >
> > Mr. Georg Schmidt-Dumont
> > Bosch Connected Industry – BCI/ESW17 Robert Bosch GmbH | Postfach 10 
> > 60 50 | 70049 Stuttgart | GERMANY | 
> > www.bosch.com<http://www.bosch.com/>
> > Phone +49 711 811-49893  | 
> > georg.schmidt-dum...@bosch.com<mailto:georg.schmidt-dum...@bosch.com
> > >
> >
> > Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> > Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. 
> > Volkmar Denner, Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf 
> > Bulander, Dr. Stefan Hartung, Dr. Markus Heyn, Dr. Dirk Hoheisel, 
> > Christoph Kübel, Uwe Raschke, Peter Tyroller
> >

Reply via email to