[ 
https://issues.apache.org/jira/browse/CASSANDRA-16154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240198#comment-17240198
 ] 

Ben Manes edited comment on CASSANDRA-16154 at 11/29/20, 9:05 AM:
------------------------------------------------------------------

Caffeine uses a write buffer to replay hash table modifications against the 
eviction policy. This avoids having every writer thread synchronizing on a 
single lock to perform an eviction, which could become a source of bottleneck. 
Typically a small burst of writes can be absorbed into this buffer and 
immediately drained, costing roughly the same as an unbounded 
ConcurrentHashMap. If the buffer is filled, e.g. due to a stress test, then 
writers are descheduled by taking the eviction lock directly. This provides 
backpressure and avoids priority inversion.

The cache may grow larger than the maximum size, up to the size of the write 
buffer. The buffer size starts small but can resize to grow up to 128 * 
pow2(NCPUs). For a 8 core server that would be up to 1024 scheduled policy 
inserts. Since the write buffer is inserted into after the hashmap operation, 
for an 8K entry cache to reach 50K entries would require as 512 core machine or 
similarly 40K blocked threads all inserting concurrently and blocked on adding 
to the buffer. As is required by the GC, here too we assume that the JVM is not 
so severely memory limited that it cannot allow for some slack in order to cope 
with the system load. If the cache entries are of a reasonable size, here 
stated as 64kb, then that is 64mb worst case slack on an 8 core machine.

This logic can be found in {{BoundedLocalCache#afterWrite}}, described in the 
class's implementation doc, and the {{Caffeine}} builder specifies that, {{Note 
that the cache may evict an entry before this limit is exceeded or temporarily 
exceed the threshold while evicting.}} You can verify the limit is honored by 
running the 
[Stresser|https://github.com/ben-manes/caffeine/blob/master/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java]
 test case, which uses synthetic load to observe if the cache has runaway 
growth. 

An {{OutOfMemoryError}} is a heisenbug in that future allocations fail due to 
behavior else where consuming the resources. Instead of using a stacktrace, a 
heap dump offers the best path for finding the root cause.


was (Author: ben.manes):
Caffeine uses a write buffer to replay hash table modifications against the 
eviction policy. This avoids having every writer thread synchronizing on a 
single lock to perform an eviction, which could become a source of bottleneck. 
Typically a small burst of writes can be absorbed into this buffer and 
immediately drained, costing roughly the same as an unbounded 
ConcurrentHashMap. If the buffer is filled, e.g. due to a stress test, then 
writers are descheduled by taking the eviction lock directly. This provides 
backpressure and avoids priority inversion.

The cache may grow larger than the maximum size, up to the size of the write 
buffer. The buffer size starts small but can resize to grow up to 128 * 
pow2(NCPUs). For a 8 core server that would be up to 1024 scheduled policy 
inserts. Since the write buffer is inserted into after the hashmap operation, 
for an 8K entry cache to reach 50K entries would require as 512 core machine or 
similarly 40K blocked all inserting concurrently and blocked on adding to the 
buffer. As is required by the GC, here too we assume that the JVM is not so 
severely memory limited that it cannot allow for some slack in order to cope 
with the system load. If the cache entries are of a reasonable size, here 
stated as 64kb, then that is 64mb worst case slack on an 8 core machine.

This logic can be found in {{BoundedLocalCache#afterWrite}}, described in the 
class's implementation doc, and the {{Caffeine}} builder specifies that, {{Note 
that the cache may evict an entry before this limit is exceeded or temporarily 
exceed the threshold while evicting.}} You can verify the limit is honored by 
running the 
[Stresser|https://github.com/ben-manes/caffeine/blob/master/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java]
 test case, which uses synthetic load to observe if the cache has runaway 
growth. 

An {{OutOfMemoryError}} is a heisenbug in that future allocations fail due to 
behavior else where consuming the resources. Instead of using a stacktrace, a 
heap dump offers the best path for finding the root cause.

> OOM Error (Direct buffer memory) during intensive reading from large SSTables
> -----------------------------------------------------------------------------
>
>                 Key: CASSANDRA-16154
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-16154
>             Project: Cassandra
>          Issue Type: Bug
>            Reporter: Vygantas Gedgaudas
>            Priority: Normal
>
> Hello,
> We have a certain database, from when we are reading intensively leads to the 
> following OOM error:
> {noformat}
>  java.lang.OutOfMemoryError: Direct buffer memory
>  at java.nio.Bits.reserveMemory(Bits.java:694) ~[na:1.8.0_212]
>  at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) 
> ~[na:1.8.0_212]
>  at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[na:1.8.0_212]
>  at 
> org.apache.cassandra.utils.memory.BufferPool.allocate(BufferPool.java:110) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.memory.BufferPool.access$1000(BufferPool.java:46) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.memory.BufferPool$LocalPool.allocate(BufferPool.java:407)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.memory.BufferPool$LocalPool.access$000(BufferPool.java:334)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.memory.BufferPool.takeFromPool(BufferPool.java:122)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.utils.memory.BufferPool.get(BufferPool.java:94) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.cache.ChunkCache.load(ChunkCache.java:155) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.cache.ChunkCache.load(ChunkCache.java:39) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> com.github.benmanes.caffeine.cache.BoundedLocalCache$BoundedLocalLoadingCache.lambda$new$0(BoundedLocalCache.java:2949)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$15(BoundedLocalCache.java:1807)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) 
> ~[na:1.8.0_212]
>  at 
> com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:1805)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:1788)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:97)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> com.github.benmanes.caffeine.cache.LocalLoadingCache.get(LocalLoadingCache.java:66)
>  ~[caffeine-2.2.6.jar:na]
>  at 
> org.apache.cassandra.cache.ChunkCache$CachingRebufferer.rebuffer(ChunkCache.java:235)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.cache.ChunkCache$CachingRebufferer.rebuffer(ChunkCache.java:213)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.io.util.RandomAccessReader.reBufferAt(RandomAccessReader.java:65)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.io.util.RandomAccessReader.seek(RandomAccessReader.java:207)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.io.util.FileHandle.createReader(FileHandle.java:150) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.io.sstable.format.SSTableReader.getFileDataInput(SSTableReader.java:1767)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.columniterator.AbstractSSTableIterator.<init>(AbstractSSTableIterator.java:103)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.columniterator.SSTableIterator.<init>(SSTableIterator.java:49)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:72)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.io.sstable.format.big.BigTableReader.iterator(BigTableReader.java:65)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound.initializeIterator(UnfilteredRowIteratorWithLowerBound.java:107)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.LazilyInitializedUnfilteredRowIterator.maybeInit(LazilyInitializedUnfilteredRowIterator.java:48)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound.getPartitionIndexLowerBound(UnfilteredRowIteratorWithLowerBound.java:191)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound.lowerBound(UnfilteredRowIteratorWithLowerBound.java:88)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound.lowerBound(UnfilteredRowIteratorWithLowerBound.java:47)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.MergeIterator$Candidate.<init>(MergeIterator.java:362)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.utils.MergeIterator$ManyToOne.<init>(MergeIterator.java:147)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.utils.MergeIterator.get(MergeIterator.java:44) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIterators$UnfilteredRowMergeIterator.<init>(UnfilteredRowIterators.java:381)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIterators$UnfilteredRowMergeIterator.create(UnfilteredRowIterators.java:397)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIterators$UnfilteredRowMergeIterator.access$000(UnfilteredRowIterators.java:360)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.rows.UnfilteredRowIterators.merge(UnfilteredRowIterators.java:121)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.SinglePartitionReadCommand.withSSTablesIterated(SinglePartitionReadCommand.java:693)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDiskInternal(SinglePartitionReadCommand.java:639)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.SinglePartitionReadCommand.queryMemtableAndDisk(SinglePartitionReadCommand.java:514)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.db.SinglePartitionReadCommand.queryStorage(SinglePartitionReadCommand.java:376)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.db.ReadCommand.executeLocally(ReadCommand.java:407) 
> ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:1882)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:2587)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[na:1.8.0_212]
>  at 
> org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162)
>  ~[apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.concurrent.SEPExecutor.maybeExecuteImmediately(SEPExecutor.java:194)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.AbstractReadExecutor.makeRequests(AbstractReadExecutor.java:117)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.AbstractReadExecutor.makeDataRequests(AbstractReadExecutor.java:85)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.AbstractReadExecutor$NeverSpeculatingReadExecutor.executeAsync(AbstractReadExecutor.java:220)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.StorageProxy$SinglePartitionReadLifecycle.doInitialQueries(StorageProxy.java:1786)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.StorageProxy.fetchRows(StorageProxy.java:1739) 
> [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.service.StorageProxy.readRegular(StorageProxy.java:1682) 
> [apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.service.StorageProxy.read(StorageProxy.java:1597) 
> [apache-cassandra-3.11.0.jar:3.11.0]
>  at org.apache.cassandra.thrift.CassandraServer.read(CassandraServer.java:96) 
> [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.CassandraServer.getSlice(CassandraServer.java:263)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.CassandraServer.multigetSliceInternal(CassandraServer.java:570)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.CassandraServer.getSliceInternal(CassandraServer.java:323)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.CassandraServer.get_slice(CassandraServer.java:300)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.Cassandra$Processor$get_slice.getResult(Cassandra.java:3659)
>  [apache-cassandra-thrift-3.11.0.jar:3.11.0]
>  at 
> org.apache.cassandra.thrift.Cassandra$Processor$get_slice.getResult(Cassandra.java:3643)
>  [apache-cassandra-thrift-3.11.0.jar:3.11.0]
>  at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) 
> [libthrift-0.9.2.jar:0.9.2]
>  at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) 
> [libthrift-0.9.2.jar:0.9.2]
>  at 
> org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:206)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [na:1.8.0_212]
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [na:1.8.0_212]
>  at 
> org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(NamedThreadFactory.java:81)
>  [apache-cassandra-3.11.0.jar:3.11.0]
>  at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]{noformat}
> This happens when we are doing large numbers of small reads from a CF that 
> has many Gigabytes worth of data and it has SSTables that are up to almost 7 
> GB. I have tried to investigate/debug the issue myself, however, I have no 
> definitive answer to myself yet whether this an expected result in given 
> conditions or is it a bug.
> What I have found so far is that the if I stop the whole VM on 
> OutOfMemoryError, or just before it happens, I can see that ChunkCache is 
> exceeding its maximum size (which is by default 512MiB in my case) and it is 
> exceeding it by a lot. For example, I find that in a situation of a usual 
> load, the cache would have near 8K entries maximum. This seems logical, since 
> roughly that many chunks of 64kb are needed to fill the cache. However, when 
> it runs out of memory it would find the cache containing more than 50K 
> entries. This is more than 10 times of its specified maximum size. My 
> question is: is that expected in a heavy load situation and what measures 
> should I take to handle this?
> If this is not expected, then I see it being potentially caused by the 
> Caffeine cache that is used for ChunkCache. After looking into its 
> implementation and any information that I could find on the internet, I am 
> pretty confident that it is quite expected that this cache may exceed its 
> maximum size.
> I would have yet to write a stress test for the cache that would be 
> intensively loading into it by multiple threads (which happens in Cassandra a 
> lot), but I am pretty sure that it is possible to provoke this cache to go 
> into a "runaway" situation where eviction from it wouldn't simply keep up at 
> times. Besides, I could not find any mechanism within that cache that would 
> allow for a substantial throttling of new inserts/loads into it, or somehow 
> forcing eviction after the maximum size is exceeded. To be exact, the 
> eviction would happen, but it does not prevent other threads to continue 
> inserting new entries into the cache, and also there seems to be by design 
> (or bug) that these other threads may only "eventually" see the update of the 
> state of the cache from "evicting" to "not evicting anymore", and thus not 
> initiating subsequent evictions even though they're needed.
> So my another question is the following: was it considered that Caffeine 
> cache may exceed the specified maximum size when it was chosen for the 
> ChunkCache, and thus is what I am observing a normal behavior in some heavy 
> load situations?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to