You need to wrap Hashmap with ConcurrentHashMap imlementation otherwise you 
will receive no Thread Notifications

Sent from my Verizon, Samsung Galaxy smartphone
Get Outlook for Android<https://aka.ms/AAb9ysg>
________________________________
From: Kohei Nozaki <ko...@apache.org>
Sent: Wednesday, December 27, 2023 5:52:02 PM
To: users@kafka.apache.org <users@kafka.apache.org>
Subject: Re: Is the KafkaStreams#store() method thread-safe?

Hi Sophie, thank you so much for sharing that. It all makes sense to me.

Unfortunately my application uses REPLACE_THREAD, so it seems like I need a 
workaround for this until this thread unsafeness is removed. As I raised in my 
first email, would sharing only the ReadOnlyWindowStore instance with other 
threads be a workaround for this? Would the store object here be able to 
capture the changes that would be made by rebalancing?

I've filed a ticket here (I'm interested in submitting a patch, but I cannot 
make any commitment): https://issues.apache.org/jira/browse/KAFKA-16055

Regards,
Kohei


> On Dec 27, 2023, at 5:43, Sophie Blee-Goldman <sop...@responsive.dev> wrote:
>
> Hey Kohei,
>
> Good question -- I don't think there's exactly a short answer to this
> seemingly simple question so bear with me for a second.
>
> My understanding is that KafkaStreams#store is very much intended to be
> thread-safe, and would have been back when it was first added a long time
> ago, and the javadocs should probably be updated to reflect that.
>
> That said, you are totally right that whatever the intention, it is
> technically not completely thread safe anymore since the storeProviders map
> can be mutated when threads are added or removed. Of course, as long as you
> are not adding or removing StreamThreads in your application, it should be
> effectively thread-safe (but note: this includes using the REPLACE_THREAD
> option with the StreamsUncaughtExceptionHandler)
>
> We should go ahead and fix this of course. I'm pretty sure we can just
> change the HashMap to a ConcurrentHashMap and be done with it -- there's
> already locking around the actual map modifications with the
> "changeThreadCount" lock in KafkaStreams, so we just need to make sure we
> don't accidentally hit a ConcurrentModificationException by accessing this
> map while it's being modified.
>
> Would you mind submitting a JIRA ticket
> <https://issues.apache.org/jira/projects/KAFKA/issues> for this bug you
> found? And would you be interested in submitting a patch yourself?
>
> Thanks!
> Sophie
>
> On Fri, Dec 22, 2023 at 6:55 PM Kohei Nozaki <ko...@apache.org> wrote:
>
>> Hello, I have Kafka Streams questions related to thread safety.
>>
>> In my Kafka Streams application, I have 2 threads below:
>>
>> * Thread A: this creates a Topology object including state stores and
>> everything and then eventually calls the constructor of the KafkaStreams
>> class and the start() method.
>>
>> * Thread B: this has a reference to the KafkaStreams object the thread A
>> created. This periodically calls KafkaStreams#store on the object, gets a
>> ReadOnlyWindowStore instance and reads the data in the store for monitoring
>> purposes.
>>
>> I'm wondering if what my app does is ok in terms of thread safeness. I'm
>> not so worried about ReadOnlyWindowStore because the javadoc says:
>> “Implementations should be thread-safe as concurrent reads and writes are
>> expected.”
>>
>> But as for KafkaStreams#store, I'm not so sure if it is ok to call from
>> separate threads. One thing which concerns me is that it touches a HashMap,
>> which is not thread safe here
>> https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39
>> . A KafkaStreams#removeStreamThread() call can mutate this HashMap object.
>> Given that, I'm not so sure if this is designed to be thread-safe.
>>
>> My questions here: is it ok to call KafkaStreams#store from a thread which
>> is different from the one which instantiated the KafkaStreams object? Or
>> would that be better to call the store() method in the same thread and
>> share only the ReadOnlyWindowStore instance with other threads? If that was
>> better, would the store object be able to capture the changes that would be
>> made by rebalancing? Is the KafkaStream class designed to be thread-safe at
>> all?
>>
>> Regards,
>> Kohei

Reply via email to