Hello, I have Kafka Streams questions related to thread safety.

In my Kafka Streams application, I have 2 threads below:

* Thread A: this creates a Topology object including state stores and 
everything and then eventually calls the constructor of the KafkaStreams class 
and the start() method.

* Thread B: this has a reference to the KafkaStreams object the thread A 
created. This periodically calls KafkaStreams#store on the object, gets a 
ReadOnlyWindowStore instance and reads the data in the store for monitoring 
purposes.

I'm wondering if what my app does is ok in terms of thread safeness. I'm not so 
worried about ReadOnlyWindowStore because the javadoc says: “Implementations 
should be thread-safe as concurrent reads and writes are expected.”

But as for KafkaStreams#store, I'm not so sure if it is ok to call from 
separate threads. One thing which concerns me is that it touches a HashMap, 
which is not thread safe here 
https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
 . A KafkaStreams#removeStreamThread() call can mutate this HashMap object. 
Given that, I'm not so sure if this is designed to be thread-safe.

My questions here: is it ok to call KafkaStreams#store from a thread which is 
different from the one which instantiated the KafkaStreams object? Or would 
that be better to call the store() method in the same thread and share only the 
ReadOnlyWindowStore instance with other threads? If that was better, would the 
store object be able to capture the changes that would be made by rebalancing? 
Is the KafkaStream class designed to be thread-safe at all?

Regards,
Kohei

Reply via email to