[ 
https://issues.apache.org/jira/browse/KAFKA-19678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029580#comment-18029580
 ] 

Steven Schlansker commented on KAFKA-19678:
-------------------------------------------

Thanks [~mjsax] for taking a look.

We have a product requirement to compute a streaming-min and streaming-max 
operation over a grouped aggregate. For example, "earliest record due date for 
each user" or "latest record created date for each user".

To do this, we take the input stream,
{code:java}
K1 = U1 V1
K2 = U1 V2
K3 = U2 V3
K4 = U2 V4 {code}
and reorganize the records so the group-key and value are the key prefix, like
{code:java}
U1 V1 K1 = K1
U1 V2 K2 = K2
U2 V3 K3 = K3
U2 V4 K4 = K4{code}
and put it in a state store. Then, to determine the minimum or maximum, we do a 
prefix range scan to take the first or last record for the group U1 or U2.

It might be possible to reduce the number of range scans by caching the minimum 
and maximum values by key, to know if the max or min possibly changed and skip 
the iterator if not, but then we need a second state store duplicating the 
winning record per user. We assumed the cost of opening an iterator is roughly 
equal to the cost of a key lookup, but maybe this is not a good assumption.

Regardless, to me, the current semantics for this metric seems wrong. If the 
store is open, with no iterators currently, the correct value for the metric is 
explicitly "0" not "null / unregister". The current setup makes it difficult to 
graph, since our dashboards will interpret "null" as "missing data" which is 
distinct from a present 0.

I would expect the metric to be unregistered only when the state store is 
closed or otherwise we are sure no new iterators will ever be created.

> Streams open iterator tracking has high contention on metrics lock
> ------------------------------------------------------------------
>
>                 Key: KAFKA-19678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19678
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.1.0
>            Reporter: Steven Schlansker
>            Priority: Major
>         Attachments: image-2025-09-05-12-13-24-910.png
>
>
> We run Kafka Streams 4.1.0 with custom processors that heavily use state 
> store range iterators.
> While investigating disappointing performance, we found a surprising source 
> of lock contention.
> Over the course of about a 1 minute profiler sample, the 
> {{org.apache.kafka.common.metrics.Metrics}} lock is taken approximately 
> 40,000 times and blocks threads for about 1 minute.
> This appears to be because our state stores generally have no iterators open, 
> except when their processor is processing a record, in which case it opens an 
> iterator (taking the lock through {{OpenIterators.add}} into 
> {{{}Metrics.registerMetric{}}}), does a tiny bit of work, and then closes the 
> iterator (again taking the lock through {{OpenIterators.remove}} into 
> {{{}Metrics.removeMetric{}}}).
> So, stream processing threads takes a globally shared lock twice per record, 
> for this subset of our data. I've attached a profiler thread state 
> visualization with our findings - the red bar indicates the thread was 
> blocked during the sample on this lock. As you can see, this lock seems to be 
> severely hampering our performance.
>  
> !image-2025-09-05-12-13-24-910.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to