There are different approaches, depending on the application's logic.
Roughly speaking, there's two distinct scenarios:
1. Your application knows all the partition keys of the required data
in advance, either by reading them from another data source (e.g.:
another Cassandra table, other database, a file, or an API), or can
reconstruct the partition keys from other known information (e.g.:
sequential numbers, date time in a known range, etc.).
2. Your application needs all (or nearly all) rows from a given table,
so you can use range requests to read everything out from that table.
However, before you choose the second option and create a table for each
"source" value, I must warn you that creating hundreds of tables in
Cassandra is a bad idea.
Ask yourself a question, what is really required to 'do something'? Do
you really need *all* data each time? Is it possible to make 'do
something' incremental, so you'll only need *some* data each time?
On 15/03/2021 19:33, Joe Obernberger wrote:
Thank you.
What is the best way to iterate over a very large number of rows in
Cassandra? I know the datastax driver let's java do blocks of n
records, but is that the best way?
-joe
On 3/15/2021 1:42 PM, Bowen Song wrote:
I personally try to avoid using secondary indexes, especially in
large clusters.
SI is not scalable, because a SI query doesn't have the partition key
information, Cassandra must send it to nearly all nodes in a DC to
get the answer. Thus, the more nodes you have in a cluster, the
slower and more expensive to run a SI query. Creating a SI on a table
also can indirectly create large partitions in the index tables.
On 15/03/2021 17:27, Joe Obernberger wrote:
Great stuff - thank you. I've spent the morning here redesigning
with smaller partitions.
If I have a large number of unique IDs that I want to regularly 'do
something' with, would it make sense to have a table where a UUID is
the partition key, and create a secondary index on a field (call it
source) that I want to select from where the number of UUIDs per
source might be very large (billions).
So - select * from table where source=?
The number of unique source values is small - maybe 1000
Whereas each source may have billions of UUIDs.
-Joe
On 3/15/2021 11:18 AM, Bowen Song wrote:
To be clear, this
CREATE TABLE ... PRIMARY KEY (k1, k2);
is the same as:
CREATE TABLE ... PRIMARY KEY ((k1), k2);
but they are NOT the same as:
CREATE TABLE ... PRIMARY KEY ((k1, k2));
The first two statements creates a table with a partition key k1
and a clustering key k2. The 3rd statement creates a composite
partition key from k1 and k2, therefore k1 and k2 are the partition
keys for this table.
Your example"create table xyz (uuid text, source text, primary key
(source, uuid));" uses the same syntax as the first statement,
which creates the table xyz with a partition key source, and a
clustering key uuid (which, BTW, is a non-reserved keyword).
A partition in Cassandra is solely determined by the partition
key(s), and the clustering key(s) have nothing to do with it. The
size of a compacted partition is determined by the number of rows
in the partition and the size of each row. If the table doesn't
have a clustering key, each partition will have at most one row.
The row size is the serialized size of all data in that row,
including tombstones.
You can reduce the partition size for a table by either reducing
the serialized data size or adding more columns to the (composite)
partition keys. But please be aware, you will have to provide ALL
partition key values when you read from or write to this table
(other than range, SI or MV queries), therefore you will need to
consider the queries before designing the table schema. For
scalability, you will need predictable partition size that does not
grow over time, or have an actionable plan to re-partition the
table when the partition size exceeds a certain threshold. Picking
the threshold is more of an art than science, generally speaking it
should stay below a few hundred MBs, and often no more than 100 MB.
On 15/03/2021 14:36, Joe Obernberger wrote:
Thank you Bowen - I'm redesigning the tables now. When you give
Cassandra two parts to the primary key like
create table xyz (uuid text, source text, primary key (source, uuid));
How is the second part of the primary key used to determine
partition size?
-Joe
On 3/12/2021 5:27 PM, Bowen Song wrote:
The partition size min/avg/max of 8409008/15096925/25109160 bytes
looks fine for the table fieldcounts, but the number of
partitions is a bit worrying. Only 3 partitions? Are you
expecting the partition size (instead of number of partitions) to
grow in the future? That can lead to a lots of headaches.
Forget about the fieldcounts table for now, the doc table looks
really bad. It has min/avg/max partition size of
24602/7052951452/63771372175 bytes, the partition sizes are
severely unevenly distributed, and the over 60GB partition is way
too big.
You really need to redesign your table schemas, and avoid
creating large or uneven partitions.
On 12/03/2021 18:52, Joe Obernberger wrote:
Thank you very much for helping me out on this! The table
fieldcounts is currently pretty small - 6.4 million rows.
cfstats are:
Total number of tables: 81
----------------
Keyspace : doc
Read Count: 3713134
Read Latency: 0.2664131157130338 ms
Write Count: 47513045
Write Latency: 1.0725477948634947 ms
Pending Flushes: 0
Table: fieldcounts
SSTable count: 3
Space used (live): 16010248
Space used (total): 16010248
Space used by snapshots (total): 0
Off heap memory used (total): 4947
SSTable Compression Ratio: 0.3994304032360534
Number of partitions (estimate): 3
Memtable cell count: 0
Memtable data size: 0
Memtable off heap memory used: 0
Memtable switch count: 0
Local read count: 379
Local read latency: NaN ms
Local write count: 0
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 100.0
Bloom filter false positives: 0
Bloom filter false ratio: 0.00000
Bloom filter space used: 48
Bloom filter off heap memory used: 24
Index summary off heap memory used: 51
Compression metadata off heap memory used: 4872
Compacted partition minimum bytes: 8409008
Compacted partition maximum bytes: 25109160
Compacted partition mean bytes: 15096925
Average live cells per slice (last five
minutes): NaN
Maximum live cells per slice (last five minutes): 0
Average tombstones per slice (last five
minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0
Commitlog is on a separate spindle on the 7 node cluster. All
disks are SATA (spinning rust as they say!). This is an R&D
platform, but I will switch to NetworkTopologyStrategy. I'm
using Prometheus and Grafana to monitor Cassandra and the CPU
load is typically 100 to 200% on most of the nodes. Disk IO is
typically pretty low.
Performance - in general Async is about 10x faster.
ExecuteAsync:
35mSec for 364 rows.
8120mSec for 205001 rows.
14788mSec for 345001 rows.
4117mSec for 86400 rows.
23,330 rows per second on average
Execute:
232mSec for 364 rows.
584869mSec for 1263283 rows
46290mSec for 86400 rows
2,160 rows per second on average
Curious - our largest table (doc) has the following stats - is
it not partitioned well?
Total number of tables: 81
----------------
Keyspace : doc
Read Count: 3713134
Read Latency: 0.2664131157130338 ms
Write Count: 47513045
Write Latency: 1.0725477948634947 ms
Pending Flushes: 0
Table: doc
SSTable count: 26
Space used (live): 57124641753
Space used (total): 57124641753
Space used by snapshots (total): 113012646218
Off heap memory used (total): 27331913
SSTable Compression Ratio: 0.2531585373184219
Number of partitions (estimate): 12
Memtable cell count: 0
Memtable data size: 0
Memtable off heap memory used: 0
Memtable switch count: 0
Local read count: 27169
Local read latency: NaN ms
Local write count: 0
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 0.0
Bloom filter false positives: 0
Bloom filter false ratio: 0.00000
Bloom filter space used: 576
Bloom filter off heap memory used: 368
Index summary off heap memory used: 425
Compression metadata off heap memory used: 27331120
Compacted partition minimum bytes: 24602
Compacted partition maximum bytes: 63771372175
Compacted partition mean bytes: 7052951452
Average live cells per slice (last five
minutes): NaN
Maximum live cells per slice (last five minutes): 0
Average tombstones per slice (last five
minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0
Thank again!
-Joe
On 3/12/2021 11:01 AM, Bowen Song wrote:
Sleep-then-retry works is just another indicator that it's
likely a GC pause related issue. I'd recommend you to check
your Cassandra servers' GC logs first.
Do you know what's the maximum partition size for the
doc.fieldcounts table? (Try the "nodetool cfstats
doc.fieldcounts" command) I suspect this table has large
partitions, which usually leads to GC issues.
As of your failed executeAsync() insert issue, do you know how
many concurrent on-the-fly queries do you have? Cassandra
driver has limitations on it, and new executeAsync() calls will
fail when the limit is reached.
I'm also a bit concerned about your "significantly" slower
inserts. Inserts (excluding "INSERT IF NOT EXISTS") should be
very fast in Cassandra. How slow are they? Are they always slow
like that, or usually fast but some are much slower than
others? What does the CPU usage & disk IO look like on the
Cassandra server? Do you have commitlog on the same disk as the
data? Is it a spinning disk, SATA SSD or NVMe?
BTW, you really shouldn't use SimpleStrategy for production
environments.
On 12/03/2021 15:18, Joe Obernberger wrote:
The queries that are failing are:
select fieldvalue, count from doc.ordered_fieldcounts where
source=? and fieldname=? limit 10
Created with:
CREATE TABLE doc.ordered_fieldcounts (
source text,
fieldname text,
count bigint,
fieldvalue text,
PRIMARY KEY ((source, fieldname), count, fieldvalue)
) WITH CLUSTERING ORDER BY (count DESC, fieldvalue ASC)
and:
select fieldvalue, count from doc.fieldcounts where source=?
and fieldname=?
Created with:
CREATE TABLE doc.fieldcounts (
source text,
fieldname text,
fieldvalue text,
count bigint,
PRIMARY KEY (source, fieldname, fieldvalue)
)
This really seems like a driver issue. I put retry logic
around the calls and now those queries work. Basically if it
throws an exception, I Thread.sleep(500) and then retry. This
seems to be a continuing theme with Cassandra in general. Is
this common practice?
After doing this retry logic, an insert statement started
failing with an illegal state exception when I retried it
(which makes sense). This insert was using
session.executeAsync(boundStatement). I changed that to just
execute (instead of async) and now I get no errors, no retries
anywhere. The insert is *significantly* slower when running
execute vs executeAsync. When using executeAsync:
com.datastax.oss.driver.api.core.NoNodeAvailableException: No
node was available to execute the query
at
com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
at
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at
com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.maybeMoveToNextPage(MultiPageResultSet.java:99)
at
com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.computeNext(MultiPageResultSet.java:91)
at
com.datastax.oss.driver.internal.core.cql.MultiPageResultSet$RowIterator.computeNext(MultiPageResultSet.java:79)
at
com.datastax.oss.driver.internal.core.util.CountingIterator.tryToComputeNext(CountingIterator.java:91)
at
com.datastax.oss.driver.internal.core.util.CountingIterator.hasNext(CountingIterator.java:86)
at
com.ngc.helios.fieldanalyzer.FTAProcess.handleOrderedFieldCounts(FTAProcess.java:684)
at
com.ngc.helios.fieldanalyzer.FTAProcess.storeResults(FTAProcess.java:214)
at
com.ngc.helios.fieldanalyzer.FTAProcess.startProcess(FTAProcess.java:190)
at com.ngc.helios.fieldanalyzer.Main.main(Main.java:20)
The interesting part here is the the line that is now failing
(line 684 in FTAProcess) is:
if (itRs.hasNext())
where itRs is an iterator<Row> over a select query from
another table. I'm iterating over a result set from a select
and inserting those results via executeAsync.
-Joe
On 3/12/2021 9:07 AM, Bowen Song wrote:
Millions rows in a single query? That sounds like a bad idea
to me. Your "NoNodeAvailableException" could be caused by
stop-the-world GC pauses, and the GC pauses are likely caused
by the query itself.
On 12/03/2021 13:39, Joe Obernberger wrote:
Thank you Paul and Erick. The keyspace is defined like this:
CREATE KEYSPACE doc WITH replication = {'class':
'SimpleStrategy', 'replication_factor': '3'} AND
durable_writes = true;
Would that cause this?
The program that is having the problem selects data,
calculates stuff, and inserts. It works with smaller
selects, but when the number of rows is in the millions, I
start to get this error. Since it works with smaller sets,
I don't believe it to be a network error. All the nodes are
definitely up as other processes are working OK, it's just
this one program that fails.
The full stack trace:
Error:
com.datastax.oss.driver.api.core.NoNodeAvailableException:
No node was available to execute the query
com.datastax.oss.driver.api.core.NoNodeAvailableException:
No node was available to execute the query
at
com.datastax.oss.driver.api.core.NoNodeAvailableException.copy(NoNodeAvailableException.java:40)
at
com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at
com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
at
com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
at
com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at
com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:54)
at
com.abc.xxxx.fieldanalyzer.FTAProcess.udpateCassandraFTAMetrics(FTAProcess.java:275)
at
com.abc.xxxx.fieldanalyzer.FTAProcess.storeResults(FTAProcess.java:216)
at
com.abc.xxxx.fieldanalyzer.FTAProcess.startProcess(FTAProcess.java:199)
at com.abc.xxxx.fieldanalyzer.Main.main(Main.java:20)
FTAProcess like 275 is:
ResultSet rs =
session.execute(getFieldCounts.bind().setString(0,
rb.getSource()).setString(1, rb.getFieldName()));
-Joe
On 3/12/2021 8:30 AM, Paul Chandler wrote:
Hi Joe
This could also be caused by the replication factor of the
keyspace, if you have NetworkTopologyStrategy and it
doesn’t list a replication factor for the
datacenter datacenter1 then you will get this error message
too.
Paul
On 12 Mar 2021, at 13:07, Erick Ramirez
<erick.rami...@datastax.com
<mailto:erick.rami...@datastax.com>> wrote:
Does it get returned by the driver every single time? The
NoNodeAvailableExceptiongets thrown when (1) all nodes are
down, or (2) all the contact points are invalid from the
driver's perspective.
Is it possible there's no route/connectivity from your app
server(s) to the 172.16.x.xnetwork? If you post the full
error message + full stacktrace, it might provide clues.
Cheers!
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
Virus-free. www.avg.com
<http://www.avg.com/email-signature?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=emailclient>
<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>