Thanks... i will try increasing the memory in case you don't spot anything wrong with the code. Other service also have streams and global k table but they use spring-kafka, but i think that should not matter, and it should work with normal kafka-streams code unless i am missing some configuration/setting here
On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote: > There is no hook. Only a restore listener, but this one is only used > during startup when the global store is loaded. It's not sure during > regular processing. > > Depending on your usage, maybe you can switch to a global store instead > of GlobalKTable? That way, you can implement a custom `Processor` and > add a hook manually? > > I don't see anything wrong with your setup. Unclear if/why the global > store would require a lot of memory... > > > -Matthias > > On 5/27/20 7:41 AM, Pushkar Deole wrote: > > Matthias, > > I tried with default store as well but getting same error, can you please > > check if I am initializing the global store in the right way: > > > > public void setupGlobalCacheTables(String theKafkaServers) { > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > DEFAULT_APPLICATION_ID); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers); > > StreamsBuilder streamsBuilder = new StreamsBuilder(); > > groupCacheTable = > > streamsBuilder.globalTable(GROUP_CACHE_TOPIC, > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()), > > Materialized.as(GROUP_CACHE_STORE_NAME)); > > Topology groupCacheTopology = streamsBuilder.build(); > > kafkaStreams = new KafkaStreams(groupCacheTopology, props); > > kafkaStreams.start(); > > > > Runtime.getRuntime().addShutdownHook(new Thread(() -> { > > LOG.info("Stopping the stream"); > > kafkaStreams.close(); > > })); > > } > > > > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> > wrote: > > > >> Hi Matthias, > >> > >> By the way, I used the in-memory global store and the service is giving > >> out of memory error during startup. Unfortunately i don't have a stack > >> trace now but when i got stack the first time, the error was coming > >> somewhere from memorypool.allocate or similar kind of method. If i get > the > >> stack trace again, I will share that with you. > >> However, the topic from where the store is reading from is empty so I am > >> not sure why the global k table is trying to occupy a lot of space. The > POD > >> memory request and limits are 500 MiB and 750 MiB respectively so the > state > >> store should fit into this memory I believe since topic is empty. Can > you > >> provide inputs on this. > >> > >> > >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com> > >> wrote: > >> > >>> Ok... got it... is there any hook that I can attach to the global k > table > >>> or global store? What I mean here is I want to know when the global > store > >>> is updated with data from topic in that case the hook that I specified > >>> should be invoked so i can do some activity like logging that, this > will > >>> allow me to know how long the global store took to sync up with topic > after > >>> the event has been put on the topic. > >>> > >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> > >>> wrote: > >>> > >>>> For example it could be some "static" information, like a mapping from > >>>> zip code to city name. > >>>> > >>>> Something that does usually not change over time. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote: > >>>>> Matthias, > >>>>> > >>>>> I am wondering what you mean by "Global store hold "axially" data > that > >>>> is > >>>>> provided from "outside" of the > >>>>> app" > >>>>> > >>>>> will you be able to give some example use case here as to what you > >>>> mean by > >>>>> axially data provided from outside app? > >>>>> > >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>> > >>>>>> Both stores sever a different purpose. > >>>>>> > >>>>>> Regular stores allow you to store state the application computes. > >>>>>> Writing into the changelog is a fault-tolerance mechanism. > >>>>>> > >>>>>> Global store hold "axially" data that is provided from "outside" of > >>>> the > >>>>>> app. There is no changelog topic, but only the input topic (that is > >>>> used > >>>>>> to re-create the global state). > >>>>>> > >>>>>> Local stores are sharded and updates are "sync" as they don't need > to > >>>> be > >>>>>> shared with anybody else. > >>>>>> > >>>>>> For global stores, as all instances need to be updated, updates are > >>>>>> async (we don't know when which instance will update it's own global > >>>>>> store replica). > >>>>>> > >>>>>>>> Say one stream thread updates the topic for global store and > starts > >>>>>>>> processing next event wherein the processor tries to read the > global > >>>>>> store > >>>>>>>> which may not have been synced with the topic? > >>>>>> > >>>>>> Correct. There is no guarantee when the update to the global store > >>>> will > >>>>>> be applied. As said, global stores are not designed to hold data the > >>>>>> application computes. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote: > >>>>>>> thanks... will try with GlobalKTable. > >>>>>>> As a side question, I didn't really understand the significance of > >>>> global > >>>>>>> state store which kind of works in a reverse way to local state > store > >>>>>> i.e. > >>>>>>> local state store is updated and then saved to changelog topic > >>>> whereas in > >>>>>>> case of global state store the topic is updated first and then > >>>> synced to > >>>>>>> global state store. Do these two work in sync i.e. the update to > >>>> topic > >>>>>> and > >>>>>>> global state store ? > >>>>>>> > >>>>>>> Say one stream thread updates the topic for global store and starts > >>>>>>> processing next event wherein the processor tries to read the > global > >>>>>> store > >>>>>>> which may not have been synced with the topic? > >>>>>>> > >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> > >>>> wrote: > >>>>>>> > >>>>>>>> Yes. > >>>>>>>> > >>>>>>>> A `GlobalKTable` uses a global store internally. > >>>>>>>> > >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or > >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually". > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote: > >>>>>>>>> Thanks Matthias. > >>>>>>>>> Can you elaborate on the replicated caching layer part? > >>>>>>>>> When you say global stores, do you mean GlobalKTable created > from a > >>>>>> topic > >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ? > >>>>>>>>> > >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax < > mj...@apache.org > >>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> It's not possible to modify state store from "outside". > >>>>>>>>>> > >>>>>>>>>> If you want to build a "replicated caching layer", you could use > >>>>>> global > >>>>>>>>>> stores and write into the corresponding topics to update all > >>>> stores. > >>>>>> Of > >>>>>>>>>> course, those updates would be async. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote: > >>>>>>>>>>> Hi All, > >>>>>>>>>>> > >>>>>>>>>>> I am wondering if this is possible: i have been asked to use > >>>> state > >>>>>>>> stores > >>>>>>>>>>> as a general replicated cache among multiple instances of > service > >>>>>>>>>> instances > >>>>>>>>>>> however the state store is created through streambuilder but is > >>>> not > >>>>>>>>>>> actually modified through stream processor topology however it > >>>> is to > >>>>>> be > >>>>>>>>>>> modified from outside the stream topology. So, essentially, the > >>>> state > >>>>>>>>>> store > >>>>>>>>>>> is just to be created from streambuilder and then to be used as > >>>> an > >>>>>>>>>>> application level cache that will get replicated between > >>>> application > >>>>>>>>>>> instances. Is this possible using state stores? > >>>>>>>>>>> > >>>>>>>>>>> Secondly, if possible, is this a good design approach? > >>>>>>>>>>> > >>>>>>>>>>> Appreciate your response since I don't know the internals of > >>>> state > >>>>>>>>>> stores. > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > > > >