nicktelford commented on code in PR #17713:
URL: https://github.com/apache/kafka/pull/17713#discussion_r1833711048


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -154,7 +155,13 @@ private void registerMetrics() {
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp()
+                (config, now) -> {
+                    try {
+                        return openIterators.isEmpty() ? null : 
openIterators.first().startTimestamp();

Review Comment:
   @mjsax I know it's a bit ugly, but I think this is the best we can do.
   
   `synchronized (openIterators)` is definitely worse, especially in the 
presence of multiple query threads, but even with just a stream thread, the 
metrics reporting thread would impact its performance. Throwing an Exception 
may not be massively fast, but it's only impacting the metrics reporting 
thread, and we only throw one in the rare case that there's a race between the 
`isEmpty()` check and `first()`.
   
   > For this case, does it even make sense to check for isEmpty()? We would 
also just to return openIterators.first().startTimestamp(); only?
   
   We still want to check `isEmpty()` first, simply to avoid having to throw 
the Exception in the common case that the Set is empty, and we're not racing. 
The Exception becomes a guard against a race between the `isEmpty()` and 
`first()` invocations, which are pretty rare, but occasionally happen.
   
   > Or use pollFirst() and re-insert afterwards (also not good code TBH...)
   
   This would cause a race with the stream threads/query threads, because when 
the Iterator is closed, they remove it from the `openIterators` Set. If this 
happens while a metrics reporter thread has temporarily removed the Iterator 
from the Set, it would fail to be removed, and then the reporter thread would 
add it back, causing the Iterator to leak!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to