Thanks... i will try increasing the memory in case you don't spot anything
wrong with the code. Other service also have streams and global k table but
they use spring-kafka, but i think that should not matter, and it should
work with normal kafka-streams code unless i am missing some
configuration/setting here

On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote:

> There is no hook. Only a restore listener, but this one is only used
> during startup when the global store is loaded. It's not sure during
> regular processing.
>
> Depending on your usage, maybe you can switch to a global store instead
> of GlobalKTable? That way, you can implement a custom `Processor` and
> add a hook manually?
>
> I don't see anything wrong with your setup. Unclear if/why the global
> store would require a lot of memory...
>
>
> -Matthias
>
> On 5/27/20 7:41 AM, Pushkar Deole wrote:
> > Matthias,
> > I tried with default store as well but getting same error, can you please
> > check if I am initializing the global store in the right way:
> >
> > public void setupGlobalCacheTables(String theKafkaServers) {
> >     Properties props = new Properties();
> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> DEFAULT_APPLICATION_ID);
> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
> >     groupCacheTable =
> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> >         Materialized.as(GROUP_CACHE_STORE_NAME));
> >     Topology groupCacheTopology = streamsBuilder.build();
> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> >       kafkaStreams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > LOG.info("Stopping the stream");
> > kafkaStreams.close();
> > }));
> > }
> >
> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
> >
> >> Hi Matthias,
> >>
> >> By the way, I used the in-memory global store and the service is giving
> >> out of memory error during startup. Unfortunately i don't have a stack
> >> trace now but when i got stack the first time, the error was coming
> >> somewhere from memorypool.allocate or similar kind of method. If i get
> the
> >> stack trace again, I will share that with you.
> >> However, the topic from where the store is reading from is empty so I am
> >> not sure why the global k table is trying to occupy a lot of space. The
> POD
> >> memory request and limits are 500 MiB and 750 MiB respectively so the
> state
> >> store should fit into this memory I believe since topic is empty. Can
> you
> >> provide inputs on this.
> >>
> >>
> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com>
> >> wrote:
> >>
> >>> Ok... got it... is there any hook that I can attach to the global k
> table
> >>> or global store? What I mean here is I want to know when the global
> store
> >>> is updated with data from topic in that case the hook that I specified
> >>> should be invoked so i can do some activity like logging that, this
> will
> >>> allow me to know how long the global store took to sync up with topic
> after
> >>> the event has been put on the topic.
> >>>
> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>
> >>>> For example it could be some "static" information, like a mapping from
> >>>> zip code to city name.
> >>>>
> >>>> Something that does usually not change over time.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
> >>>>> Matthias,
> >>>>>
> >>>>> I am wondering what you mean by "Global store hold "axially" data
> that
> >>>> is
> >>>>> provided from "outside" of the
> >>>>> app"
> >>>>>
> >>>>> will you be able to give some example use case here as to what you
> >>>> mean by
> >>>>> axially data provided from outside app?
> >>>>>
> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Both stores sever a different purpose.
> >>>>>>
> >>>>>> Regular stores allow you to store state the application computes.
> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
> >>>>>>
> >>>>>> Global store hold "axially" data that is provided from "outside" of
> >>>> the
> >>>>>> app. There is no changelog topic, but only the input topic (that is
> >>>> used
> >>>>>> to re-create the global state).
> >>>>>>
> >>>>>> Local stores are sharded and updates are "sync" as they don't need
> to
> >>>> be
> >>>>>> shared with anybody else.
> >>>>>>
> >>>>>> For global stores, as all instances need to be updated, updates are
> >>>>>> async (we don't know when which instance will update it's own global
> >>>>>> store replica).
> >>>>>>
> >>>>>>>> Say one stream thread updates the topic for global store and
> starts
> >>>>>>>> processing next event wherein the processor tries to read the
> global
> >>>>>> store
> >>>>>>>> which may not have been synced with the topic?
> >>>>>>
> >>>>>> Correct. There is no guarantee when the update to the global store
> >>>> will
> >>>>>> be applied. As said, global stores are not designed to hold data the
> >>>>>> application computes.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
> >>>>>>> thanks... will try with GlobalKTable.
> >>>>>>> As a side question, I didn't really understand the significance of
> >>>> global
> >>>>>>> state store which kind of works in a reverse way to local state
> store
> >>>>>> i.e.
> >>>>>>> local state store is updated and then saved to changelog topic
> >>>> whereas in
> >>>>>>> case of global state store the topic is updated first and then
> >>>> synced to
> >>>>>>> global state store. Do these two work in sync i.e. the update to
> >>>> topic
> >>>>>> and
> >>>>>>> global state store ?
> >>>>>>>
> >>>>>>> Say one stream thread updates the topic for global store and starts
> >>>>>>> processing next event wherein the processor tries to read the
> global
> >>>>>> store
> >>>>>>> which may not have been synced with the topic?
> >>>>>>>
> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Yes.
> >>>>>>>>
> >>>>>>>> A `GlobalKTable` uses a global store internally.
> >>>>>>>>
> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
> >>>>>>>>> Thanks Matthias.
> >>>>>>>>> Can you elaborate on the replicated caching layer part?
> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
> from a
> >>>>>> topic
> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
> >>>>>>>>>
> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
> mj...@apache.org
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> It's not possible to modify state store from "outside".
> >>>>>>>>>>
> >>>>>>>>>> If you want to build a "replicated caching layer", you could use
> >>>>>> global
> >>>>>>>>>> stores and write into the corresponding topics to update all
> >>>> stores.
> >>>>>> Of
> >>>>>>>>>> course, those updates would be async.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
> >>>>>>>>>>> Hi All,
> >>>>>>>>>>>
> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
> >>>> state
> >>>>>>>> stores
> >>>>>>>>>>> as a general replicated cache among multiple instances of
> service
> >>>>>>>>>> instances
> >>>>>>>>>>> however the state store is created through streambuilder but is
> >>>> not
> >>>>>>>>>>> actually modified through stream processor topology however it
> >>>> is to
> >>>>>> be
> >>>>>>>>>>> modified from outside the stream topology. So, essentially, the
> >>>> state
> >>>>>>>>>> store
> >>>>>>>>>>> is just to be created from streambuilder and then to be used as
> >>>> an
> >>>>>>>>>>> application level cache that will get replicated between
> >>>> application
> >>>>>>>>>>> instances. Is this possible using state stores?
> >>>>>>>>>>>
> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
> >>>>>>>>>>>
> >>>>>>>>>>> Appreciate your response since I don't know the internals of
> >>>> state
> >>>>>>>>>> stores.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >
>
>

Reply via email to