[
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900351#comment-16900351
]
Matthias J. Sax commented on KAFKA-8736:
----------------------------------------
[~MJarvie] – I added you to the list on contributors. You can now self-assign
tickets.
> Performance: ThreadCache uses size() for empty cache check
> ----------------------------------------------------------
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 2.3.0
> Reporter: Matthew Jarvie
> Assignee: Matthew Jarvie
> Priority: Major
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential
> performance improvement. The test showed we were spending 80% of CPU time in
> ConcurrentSkipListMap.size():
>
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84%
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83%
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3%
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
> java.lang.Object):87
> 96.3%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
> java.lang.Object):133
> 96.3%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
> java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
> java.lang.Object, java.lang.Object):201
> 96.23%
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
> java.lang.Object):117
> 96.12%
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
> java.lang.Object):43
> 96.12%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
> java.lang.Object):133
> 96.12%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
> java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
> java.lang.Object, java.lang.Object):201
> 96.08%
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
> java.lang.Object):117
> 82.78%
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
> java.lang.Object):169
> 82.78%
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
> java.lang.Object):612
> 82.59%
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
> java.lang.Object):127
> 81.11%
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
> java.lang.Object):35
> 81.09%
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
> byte[]):131
> 81.09%
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String,
> org.apache.kafka.common.utils.Bytes,
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53%
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
> the size method has to traverse all elements to get a count. It looks like
> the count is being compared against 0 to determine if the map is empty; In
> this case, we don't need a full count. Instead, the isEmpty() method should
> be used, which just looks for one node. We patched this and gained about 25%
> max throughput, and this method disappeared from thread dumps as a hot spot.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)