Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is stored 
to a changelog as well. The time series data is bucketed (5 minutes, 1 hour, 
and 1 day). Our goal was to always only have a max of 2 time buckets in the 
store at once. As we receive new timeseries data, we figure out what time 
bucket it belongs to, and add it to its respective bucket. We have a “grace 
period” which allows for late arriving data to be processed even after a time 
bucket has ended. That’s the reason why we have this constraint of 2 time 
buckets max within the store; 1 for the previous bucket in its grace period, 1 
for the current bucket.

So we wanted to extend the base state store and add a simple in-memory map to 
track the 2 time buckets per timeseries (that’s the store key). A couple 
reasons why we don’t want to add this as a separate state store or the existing 
store are:
1. There is a ton of serialization / deserialization that happens behind the 
scenes
2. This new time bucket tracking map would only be updated a couple times per 
time bucket, and does not need to be updated on every message read.
3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store functionality, 
create a “new state store” that really just instantiates one of the included 
stores within, add this in memory map, and then plug into/alter/extend the 
restore functionality to populate the time bucket tracking map during restore 
time.

It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a method 
to scan the whole store and build up the time bucket map before starting to 
process.

Any advice on Kafka streams / state store logic would be appreciated!
-Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Matthias J. Sax <mj...@apache.org>
Date: Wednesday, January 18, 2023 at 12:50 AM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: Re: Custom Kafka Streams State Restore Logic
Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:
> Hello all,
>
> I am currently working on creating a new InMemoryWindowStore, by
> extending the default in memory window store. One of the roadblocks I’ve
> run into is finding a way to add some custom logic when the state store
> is being restored from the changelog. I know that this is possible if I
> completely write the store logic from scratch, but we really only want
> to add a tiny bit of custom logic, and do not want to have to replicate
> all the existing logic.
>
> Is there a simple way for this to be done? I see the default
> implementation in the InMemoryWindowStore :
>
> context.register(
>      root,
>      (RecordBatchingStateRestoreCallback) records -> {
>          for (final ConsumerRecord<byte[], byte[]> record : records) {
>              put(
>                  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
>                  record.value(),
> /extractStoreTimestamp/(record.key())
>              );
>
> ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
>                  record,
>                  consistencyEnabled,
>                  position
>              );
>          }
>      }
> );
>
> Thanks in advance!
>
> Upesh
>
> <https://www.itrsgroup.com/>
>
>
> Upesh Desai​
> Senior Software Developer
>
> *ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any
> view or opinions presented are solely those of the author and do not
> necessarily represent those of the ITRS Group unless otherwise
> specifically stated.
>
> [itrs.email.signature]
>

Reply via email to