Hi Mikhall,

There are a few ways to speed up compactions in the short term:
- nodetool setcompactionthroughput 0
This will unthrottle compactions but obviously unthrottling compactions puts 
you at risk of high latency while compactions are running.
- nodetool setconcurrentcompactors 2
You usually want to set this to the lower of disks or cores. If you are using 
SSDs you want to use the number of cores which it looks like d2.xlarge have 2 
virtual cores.
- nodetool disablebinary
You can use this to stop an individual node from acting as coordinator.
This will let the node focus on catching up on compactions and you can use it 
if one or two nodes has significantly higher pending compactions then the rest 
of the cluster.
- nodetool disablegossip / disablethrift
Same logic as above except with accepting writes and you can only do it for 
~2-2..5 hours or you risk inconsistent data by missing the hinted handoff 
period.

Long term solutions:
- Consider switching instance type
The nodes you are using are storage optimised. They have very little processing 
power which is needed to process compactions. Also the AWS documentation seems 
to suggest HDD not SSD on this instance. Are you sure you actually have SSDs 
because that makes a big difference.
- Add nodes
The data will redistribute over more nodes and each node will be responsible 
for less compactions (less data ~= less compactions)
- If it’s a batch load make Spark do it
My impression is that you want to batch load from Cassandra to Elasticsearch 
after batch loading from Spark to Cassandra. If that is the case, why not get 
Spark to do the batch load if it already has the data (maybe I’m 
misinterpreting what you are doing).
- Consider throttling Spark when it batch loads to Cassandra
If Cassandra gets overwhelmed it can start acting up, keep an eye out for lots 
of undersized SSTables, it might be a sign that Cassandra is running out of 
Memory during the batch load and flushing lots of little Memtables to Disk as 
SSTables to conserve memory.

Some final follow up questions:
- What is the purpose of this cluster?
Is it to support BAU, run daily analytics, or event an occasional one time 
cluster required to spin up for some analysis before being spun down? This info 
helps a lots in understanding where you can make concessions.
 - What is the flow of data and what are the timing requirements?

Cheers,
Eevee.

> On 28 Apr 2018, at 3:54 am, Mikhail Tsaplin <tsmis...@gmail.com> wrote:
> 
> The cluster has 5 nodes of d2.xlarge AWS type (32GB RAM, Attached SSD disks), 
> Cassandra 3.0.9.
> Increased compaction throughput from 16 to 200 - active compaction remaining 
> time decreased.
> What will happen if another node will join the cluster? - will former nodes 
> move part of theirs SSTables to the new node unchanged and compaction time 
> will be reduced?
> 
> 
> 
> $ nodetool cfstats -H  dump_es                                                
>                                                                               
>                          
> Keyspace: table_b
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18155
>                 Space used (live): 1.2 TB
>                 Space used (total): 1.2 TB
>                 Space used by snapshots (total): 0 bytes
>                 Off heap memory used (total): 3.62 GB
>                 SSTable Compression Ratio: 0.20371982719658258
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0 bytes
>                 Memtable off heap memory used: 0 bytes
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2.22 GB
>                 Bloom filter off heap memory used: 2.56 GB
>                 Index summary off heap memory used: 357.51 MB
>                 Compression metadata off heap memory used: 724.97 MB
>                 Compacted partition minimum bytes: 771 bytes
>                 Compacted partition maximum bytes: 1.55 MB
>                 Compacted partition mean bytes: 3.47 KB
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
> 
> 
> 2018-04-27 22:21 GMT+07:00 Nicolas Guyomar <nicolas.guyo...@gmail.com 
> <mailto:nicolas.guyo...@gmail.com>>:
> Hi Mikhail,
> 
> Could you please provide :
> - your cluster version/topology (number of nodes, cpu, ram available etc)
> - what kind of underlying storage you are using
> - cfstat using -H option cause I'm never sure I'm converting bytes=>GB
> 
> You are storing 1Tb per node, so long running compaction is not really a 
> surprise, you can play with concurrent compaction thread number, compaction 
> throughput to begin with
> 
> 
> On 27 April 2018 at 16:59, Mikhail Tsaplin <tsmis...@gmail.com 
> <mailto:tsmis...@gmail.com>> wrote:
> Hi,
> I have a five nodes C* cluster suffering from a big number of pending 
> compaction tasks: 1) 571; 2) 91; 3) 367; 4) 22; 5) 232
> 
> Initially, it was holding one big table (table_a). With Spark, I read that 
> table, extended its data and stored in a second table_b. After this 
> copying/extending process the number of compaction tasks in the cluster has 
> grown up. From nodetool cfstats (see output at the bottom): table_a has 20 
> SSTables and table_b has 18219.
> 
> As I understand table_b has a big SSTables number because data was 
> transferred from one table to another within a short time and eventually this 
> tables will be compacted. But now I have to read whole data from this table_b 
> and send it to Elasticsearch. When Spark reads this table some Cassandra 
> nodes are dying because of OOM.
> 
> I think that when compaction will be completed - the Spark reading job will 
> work fine.
> 
> The question is how can I speed up compaction process, what if I will add 
> another two nodes to cluster - will compaction finish faster? Or data will be 
> copied to new nodes but compaction will continue on the original set of 
> SSTables?
> 
> 
> *Nodetool cfstats output:
> 
>                 Table: table_a
>                 SSTable count: 20
>                 Space used (live): 1064889308052
>                 Space used (total): 1064889308052
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 1118106937
>                 SSTable Compression Ratio: 0.12564594959566894
>                 Number of keys (estimate): 56238959
>                 Memtable cell count: 76824
>                 Memtable data size: 115531402
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 17
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 77308
>                 Local write latency: 0.045 ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 120230328
>                 Bloom filter off heap memory used: 120230168
>                 Index summary off heap memory used: 2837249
>                 Compression metadata off heap memory used: 995039520
>                 Compacted partition minimum bytes: 1110
>                 Compacted partition maximum bytes: 52066354
>                 Compacted partition mean bytes: 133152
>                 Average live cells per slice (last five minutes): NaN
>                 Maximum live cells per slice (last five minutes): 0
>                 Average tombstones per slice (last five minutes): NaN
>                 Maximum tombstones per slice (last five minutes): 0
> 
> 
> nodetool cfstats table_b
> Keyspace: dump_es
>         Read Count: 0
>         Read Latency: NaN ms.
>         Write Count: 0
>         Write Latency: NaN ms.
>         Pending Flushes: 0
>                 Table: table_b
>                 SSTable count: 18219
>                 Space used (live): 1316641151665
>                 Space used (total): 1316641151665
>                 Space used by snapshots (total): 0
>                 Off heap memory used (total): 3863604976
>                 SSTable Compression Ratio: 0.20387645535477916
>                 Number of keys (estimate): 712032622
>                 Memtable cell count: 0
>                 Memtable data size: 0
>                 Memtable off heap memory used: 0
>                 Memtable switch count: 0
>                 Local read count: 0
>                 Local read latency: NaN ms
>                 Local write count: 0
>                 Local write latency: NaN ms
>                 Pending flushes: 0
>                 Bloom filter false positives: 0
>                 Bloom filter false ratio: 0.00000
>                 Bloom filter space used: 2382971488
>                 Bloom filter off heap memory used: 2742320056
>                 Index summary off heap memory used: 371500752
>                 Compression metadata off heap memory used: 749784168
>                 Compacted partition minimum bytes: 771
>                 Compacted partition maximum bytes: 1629722
>                 Compacted partition mean bytes: 3555
>                 Average live cells per slice (last five minutes): 132.375
>                 Maximum live cells per slice (last five minutes): 149
>                 Average tombstones per slice (last five minutes): 1.0
>                 Maximum tombstones per slice (last five minutes): 1
> 
> 
> ------------------
> 
> 
> I logged CQL requests going from Spark and checked how one such request is 
> performing - it fetches 8075rows, 59mb data in 155s (see below check output)
> 
> $ date; echo 'SELECT "scan_id", "snapshot_id", "scan_doc", "snapshot_doc" 
> FROM "dump_es"."table_b" WHERE token("scan_id") > 946122293981930504 AND 
> token("scan_id") <= 946132293981
> 930504  ALLOW FILTERING;' | cqlsh --request-timeout=3600 | wc ; date          
>                                                                               
>                                                
> Fri Apr 27 13:32:55 UTC 2018
>    8076   61191 59009831
> Fri Apr 27 13:35:30 UTC 2018
> 
> 
> 
> 

Reply via email to