Hi Sophie,

thanks a lot for you tip! I've implemented a StateListener - to block queries 
when the state does not equal RUNNING. This will work perfectly now for our 
use-case!


In the meantime I noticed the InteractiveQuery API v2 and give it a try. 
Unfortunately it seems not to cope with GlobalKTable. When try to run this:

return 
streams.query(StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(key)));

I got: "Global stores do not yet support the KafkaStreams#query API. Use 
KafkaStreams#store instead."

From my point of view it would be great if this will work and behave like with 
IN_MEMORY StoreType as it is straight forward to use.

Do you see a chance to get InteractiveQueryV2 work with GlobalKTable?

Kind regards,
Christian

-----Original Message-----
From: Sophie Blee-Goldman <sop...@responsive.dev> 
Sent: Wednesday, November 22, 2023 1:51 AM
To: christian.zueg...@ams-osram.com.invalid
Cc: users@kafka.apache.org
Subject: Re: GlobalKTable with RocksDB - queries before state RUNNING?

[Sie erhalten nicht häufig E-Mails von sop...@responsive.dev. Weitere 
Informationen, warum dies wichtig ist, finden Sie unter 
https://aka.ms/LearnAboutSenderIdentification ]

Just to make sure I understand the logs, you're saying the "new file processed" 
lines represent store queries, and presumably the 
com.osr.serKafkaStreamsService is your service that's issuing these queries?

You need to wait for the app to finish restoring state before querying it.
Based on this message -- "KafkaStreams has not been started, you can retry 
after calling start()" -- I assume you're kicking off the querying service 
right away and blocking queries until after KafkaStreams#start is called.
But you need to wait for it to actually finish starting up, not just for
start() to be called. The best way to do this is by setting a state listener 
via KafkaStreams#setStateListener, and then using this to listen in on the 
KafkaStreams.State and blocking the queries until the state has changed to 
RUNNING.

In case you're curious about why this seems to work with in-memory stores but 
not with rocksdb, it seems like in the in-memory case, the queries that are 
attempted during restoration are blocked due to the store being closed 
(according to "(Quarkus Main Thread) the state store, store-name, is not
open.")

So why is the store closed for most of the restoration in the in-memory case 
only? This gets a bit into the weeds, but it has to do with the sequence of 
events in starting up a state store. When the global thread starts up, it'll 
first loop over all its state stores and call #init on them. Two things have to 
happen inside #init: the store is opened, and the store registers itself with 
the ProcessorContext. The #register involves various things, including a call 
to fetch the end offsets of the topic for global state stores. This is a 
blocking call, so the store might stay inside the #register call for a 
relatively long while.

For RocksDB stores, we open the store first and then call #register, so by the 
time the GlobalStreamThread is sitting around waiting on the end offsets 
response, the store is open and your queries are getting through to it. However 
the in-memory store actually registers itself *first*, before marking itself as 
open, and so it remains closed for most of the time it spends in restoration 
and blocks any query attempts during this time.

I suppose it would make sense to align the two store implementations to have 
the same behavior, and the in-memory store is probably technically more 
correct. But in the end you really should just wait for the KafkaStreams.State 
to get to RUNNING before querying the state store, as that's the only true 
guarantee.

Hope this helps!

-Sophie

On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner 
<christian.zueg...@ams-osram.com.invalid> wrote:

> Hi,
>
> we have the following problem - a Kafka Topic ~20Megabytes is made 
> available as GlobalKTable for queries. With using RocksDB the GKTable 
> is ready for queries instantly even without having reading the data 
> complete - all get() requests return null. After a few seconds the 
> data is querieable correctly - but this is to late for our 
> application. Once we switch to IN_MEMORY we get the expected behavior. 
> The store is only ready after all data has been read from topic.
>
> How can we achieve the same behavior with the RocksDB setup?
>
> Snipet to build KafkaStreams Topology
>
> builder.globalTable(
>   "topic-name",
>   Consumed.with(Serdes.String(), Serdes.String()),
>
> Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS
> _DB)
> );
>
> Query the Table
>
> while (true) {
>             try {
>                 return streams.store(
>
> StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.
> STORE_NAME, QueryableStoreTypes.keyValueStore()));
>             } catch (InvalidStateStoreException e) {
>                 logger.warn(e.getMessage());
>                 try {
>                     Thread.sleep(3000);
>                 } catch (InterruptedException ignored) {
>                 }
>             }
>         }
>
> The store is queried with getStore().get(key); <- here we get the null 
> values.
>
> This is the Log Output when RocksDB - first query before state RUNNING
>
> ...
> 2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService] 
> (Quarkus Main Thread) wait for kafka streams store to get ready: 
> KafkaStreams has not been started, you can retry after calling start()
> 2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition 
> from CREATED to REBALANCING
> 2023-11-21 15:15:41,819 INFO  
> [org.apa.kaf.str.sta.int.RocksDBTimestampedStore]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> Opening store store-name in regular mode
> 2023-11-21 15:15:41,825 INFO  
> [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> global-stream-thread
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread]
> Restoring state for global store store-name
> 2023-11-21 15:15:43,753 INFO  [io.quarkus] (Quarkus Main Thread) demo 
> 1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
> 2023-11-21 15:15:43,754 INFO  [io.quarkus] (Quarkus Main Thread) 
> Profile dev activated. Live Coding activated.
> 2023-11-21 15:15:43,756 INFO  [io.quarkus] (Quarkus Main Thread) 
> Installed
> features: [apicurio-registry-avro, cdi, config-yaml, kafka-client, 
> kafka-streams, logging-gelf, smallrye-context-propagation, 
> smallrye-fault-tolerance, smallrye-reactive-messaging, 
> smallrye-reactive-messaging-kafka, vertx]
> 2023-11-21 15:15:44,195 INFO  [com.osr.ser.KafkaStreamsService]
> (vert.x-worker-thread-1) new file processed
> 2023-11-21 15:15:44,629 INFO  
> [org.apa.kaf.str.pro.int.GlobalStreamThread]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> global-stream-thread
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] 
> State transition from CREATED to RUNNING
> 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State 
> transition from REBALANCING to RUNNING
> 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 stream 
> threads ...
>
> Once I configure with StoreType.IN_MEMORY no queries are done before 
> the state is RUNNING
>
> 2023-11-21 15:28:25,511 WARN  [com.osr.serKafkaStreamsService] 
> (Quarkus Main Thread) KafkaStreams has not been started, you can retry 
> after calling
> start()
> 2023-11-21 15:28:26,730 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition 
> from CREATED to REBALANCING
> 2023-11-21 15:28:26,752 INFO  
> [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> global-stream-thread
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread]
> Restoring state for global store store-name
> 2023-11-21 15:28:29,834 WARN  [com.osr.serKafkaStreamsService] 
> (Quarkus Main Thread) the state store, store-name, is not open.
> 2023-11-21 15:28:33,670 WARN  [com.osr.serKafkaStreamsService] 
> (Quarkus Main Thread) the state store, store-name, is not open.
> 2023-11-21 15:28:33,763 INFO  
> [org.apa.kaf.str.pro.int.GlobalStreamThread]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> global-stream-thread
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] 
> State transition from CREATED to RUNNING
> 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State 
> transition from REBALANCING to RUNNING
> 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 stream 
> threads
> 2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService]
> (vert.x-worker-thread-1) new file processed
>
>
> Thanks for any input!
> Christian
>
>
>
>

Reply via email to