PhilHardwick commented on a change in pull request #10921:
URL: https://github.com/apache/kafka/pull/10921#discussion_r672966032



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1081,6 +1079,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
                             } else {
                                 log.info("Successfully removed {} in {}ms", 
streamThread.getName(), time.milliseconds() - startMs);
                                 threads.remove(streamThread);
+                                queryableStoreProvider.removeStoreProvider(new 
StreamThreadStateStoreProvider(streamThread));

Review comment:
       Yeah agreed - that's a better idea, I've changed to a Map and being able 
to remove by thread name. To be honest, I'd like to just pass in StreamThread 
into the #addStoreProvider but then it's hard to test because the 
StreamThreadStateStoreProvider is instantiated inside #addStoreProvider rather 
than being able to inject a stub (this would also allow #removeStoreProvider to 
just be passed a StreamThread which would make it more consistent). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to