[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15413016#comment-15413016 ] Stefania commented on CASSANDRA-11521: -- Rebase completed, tests are fine: ||trunk|[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]| I've also linked this new feature to version 5 of the native protocol and added support for it to the Java driver. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting, protocolv5 > Fix For: 3.x > > Attachments: final-patch-jfr-profiles-1.zip > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411398#comment-15411398 ] Stefania commented on CASSANDRA-11521: -- Thank you! > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting, protocolv5 > Fix For: 3.x > > Attachments: final-patch-jfr-profiles-1.zip > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411391#comment-15411391 ] DOAN DuyHai commented on CASSANDRA-11521: - Thanks Stefania, great works! > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting, protocolv5 > Fix For: 3.x > > Attachments: final-patch-jfr-profiles-1.zip > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400395#comment-15400395 ] Stefania commented on CASSANDRA-11521: -- The patch is ready for review: ||trunk|[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]| There are also the [driver patch|https://github.com/stef1927/java-driver/commits/11521] and the [spark connector patch|https://github.com/stef1927/spark-cassandra-connector/commits/11521]. For these I plan to create tickets for the respective projects once the native protocol changes have been finalized. A [design document|https://docs.google.com/document/d/1YqKGSU1P8EJIfMrO--29VaSoCy5mUu-ePfAiIOLsY7o/edit] is also available. The Spark benchmark results are available in [this comment|https://issues.apache.org/jira/browse/CASSANDRA-9259?focusedCommentId=15400394=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15400394] on the parent ticket. The final patch is slightly better than the proof-of-concept, and the asynchronous paging mechanism significantly outperforms the existing mechanism for large data sets. I've also repeated some cstar_perf tests to rule out performance regressions with ordinary queries, which are not in the optimized path: * Single partition queries (default cassandra-stress read command) at CL.LOCAL_ONE (the cassandra-stress default): [first run|http://cstar.datastax.com/graph?command=one_job=8b1f1d54-53e4-11e6-85af-0256e416528f=99th_latency=2_read=1_aggregates=true=0=276.98=0=22.33], [second run with swapped revision's order|http://cstar.datastax.com/graph?command=one_job=1abd3fe4-545e-11e6-8920-0256e416528f=op_rate=2_read=1_aggregates=true=0=277.86=0=243951.4], [an old run|http://cstar.datastax.com/graph?command=one_job=16cef080-53dc-11e6-b967-0256e416528f=op_rate=2_read=1_aggregates=true=0=282.92=0=249571.3] done before enabling token aware routing in cassandra stress. * Single partition queries at CL.ALL: [unique run|http://cstar.datastax.com/graph?command=one_job=e2155410-5462-11e6-9cd7-0256e416528f=op_rate=2_read=1_aggregates=true=0=277.75=0=246123.9] There is a gap of 3.6K ops/second without token aware routing and 1K with CL=ALL. With token aware routing the patch is instead 1K ops / second faster. These differences must arise from the refactoring in select statement. They are very small differences, the test error seems to be around 0.5K, but I can look into it further if there are concerns. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting, protocolv5 > Fix For: 3.x > > Attachments: final-patch-jfr-profiles-1.zip > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379166#comment-15379166 ] Stefania commented on CASSANDRA-11521: -- Here is quick update. Progress has been somewhat slow in the last week due to other tasks. Nonetheless, the unit and dtests are now passing. I've repeated the benchmark and unfortunately there is a performance degradation of about 25% compared to the proof of concept. I've started investigating this but I haven't identified the exact reason for it. I'm attaching the JFR profiles for client and server in case anyone is interested in taking a look: [^final-patch-jfr-profiles-1.zip]. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > Attachments: final-patch-jfr-profiles-1.zip > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365643#comment-15365643 ] Stefania commented on CASSANDRA-11521: -- Adding links to patch and CI results, not yet ready for review but in case someone wants to take an early look: |[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]| > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365639#comment-15365639 ] Stefania commented on CASSANDRA-11521: -- Right. In this case it gets emulated over multiple pages, see the distributed case [here|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java#L450] vs. the local case [here|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java#L546]. It's actually {{Selection.RowBuilder}}, implemented by [{{AsyncPagingService.PageBuilder}}|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/async/paging/AsyncPagingService.java#L132] that monitors pages and sends them to the client when they are available. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365637#comment-15365637 ] Stefania commented on CASSANDRA-11521: -- Yes it will be delivered together with this patch. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365627#comment-15365627 ] Jonathan Ellis commented on CASSANDRA-11521: Excellent writeup, thanks! > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365625#comment-15365625 ] Jonathan Ellis commented on CASSANDRA-11521: bq. The first thought was to have two new commands, a STREAM request and a STREAM response but I chose against this so that moving forward we could phase out the existing request-response mechanism, clients could simply use streaming for everything and we could set the maximum number of pages to one by default But we still need the storageproxy path for CL > ONE, right? So how do we get to a "streaming for everything" world? > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365623#comment-15365623 ] Jonathan Ellis commented on CASSANDRA-11521: bq. Optimize local range reads at CL.ONE by keeping the iterators open across pages and avoiding the storage proxy layer Does this subsume CASSANDRA-11520 then? > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365479#comment-15365479 ] Stefania commented on CASSANDRA-11521: -- We bypass StorageProxy for local range queries that use no index, at CL 1. The decision is taken by SelectStatement. I've drafted a quick design doc [here|https://docs.google.com/document/d/1YqKGSU1P8EJIfMrO--29VaSoCy5mUu-ePfAiIOLsY7o/edit?usp=sharing]. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365172#comment-15365172 ] Jonathan Ellis commented on CASSANDRA-11521: If this bypasses the coordinator does that mean we're limited to unfiltered seq scan, or do we support all of SELECT still? > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358245#comment-15358245 ] Stefania commented on CASSANDRA-11521: -- Thank you! > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357172#comment-15357172 ] Adam Holmberg commented on CASSANDRA-11521: --- Thanks for the heads up. I created driver ticket [PYTHON-597|https://datastax-oss.atlassian.net/browse/PYTHON-597] for this. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Labels: client-impacting > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356834#comment-15356834 ] Stefania commented on CASSANDRA-11521: -- Just a quick update since I haven't had a chance to update this ticket in over a month. I have an almost final implementation of the patch, I need to fix a few more dtests, implement new unit tests to test some corner cases, and repeat the benchmark to rule out performance regressions. I hope to have something ready for review in the next few weeks, depending on other tasks and the results of the benchmark and new unit tests. The new delivery mechanism has been renamed from _streaming_ to _asynchronous paging_. In terms of implementation, I ended up having something somewhat more generic than what originally stated. Specifically, optimized local reads will be available regardless of the delivery mechanism to the client (page-by-page or asynchronous paging). This means that CASSANDRA-11520 will be included in this patch. Also, asynchronous paging will be available for all queries at all consistency levels, not just optimized load queries. This does mean however that we will need to implement the protocol changes in the python driver as well, in order to write dtests, cc [~aholmber] for a heads up. When using the new asynchronous paging delivery mechanism, clients will be able to page by rows and by bytes. If paging by bytes, the page size returned to the client will be between: * the requested size minus one avg row size * the requested size plus \[the current CQL row size - an avg row size\]. The avg row size is refined for every page sent. At CL > 1, intra-node, we are still sending requests using page sizes in rows, to calculate the number of rows we divide the page size in bytes by an average row size. As for the optimized local reads, for now they are simply done in SelectStatement by releasing the op order periodically. Later, we may be able to do something fancier if required, such as using only sstable references, avoiding polluting the chunk cache, using a larger read ahead and so forth. For now, I wanted to focus on keeping delivery and local reads orthogonal and implementing the new delivery mechanism. h3. Client protocol changes A new flag has been added to query options: {code} +0x80: With extended flags. If set, should be present. +- is a [byte] whose bits define additional options for this query and, + like , may influence the remainder of the message. Supported extended flags: +0x01: With paging options. If set, will be present and the + query result will be pushed to the client asynchronously, and according to + the paging options. Asynchronous paging can be interrupted via a CANCEL request. + contains the following: +- , a [UUID] that uniquely identifies an asynchronous paging session. +- , an [Int] indicating the size of each page in the unit defined below. + This is a mandatory parameter that takes precedence over , + which is obsoleted by this parameter. +- , an [Int] indicating the page unit: 1 indicates bytes and 2 indicates rows. +- , an [Int] indicating the maximum number of pages to + receive in total, set this to zero to indicate no limit. +- , an [Int] indicating the maximum number of pages to receive + per second, set this to zero to indicate no limit. + {code} A new cancel message has been added: {code} +4.1.9. CANCEL + + Request to cancel an asynchronous operation. The body of the message is: + - an [int] identifying the operation type: + - 1 for cancelling an async paging session + - a [uuid] that contains the unique identifier of the operation to cancel. + {code} A new flag has been added to the response metadata: {code} +0x0005With_asynchronous_paging: if set, this result is part of an asynchronous + paging session and the will be present. [...] +- contains: + - a [uuid] that uniquely identifies the asynchronous paging session + - an [int] that identifies the sequential number of this result in the session + - a [boolean] that is true for the last message in the session {code} The paging state will be returned as usual, so clients will be able to resume paging on any node using the old or new paging mechanism. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee:
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297595#comment-15297595 ] Stefania commented on CASSANDRA-11521: -- Thank you for following up Benedict. Regarding holding {{OpOrder}} and isolation, it would be nice to offer isolation at the partition level. I don't think we can offer total isolation if we release sstables periodically but, if we release them only at partition boundaries, then isolation at the partition level should be possible. To recap, one option is to copy the entire memtable sub-maps initially, but this increases memory used and we may hold partitions that are no longer relevant if in the meantime the memtable is flushed and gets picked up when we periodically refresh sstables. Another option is to copy a partition, or reference it but this is quite hard, or hold the OpOrder but only when a specific partition is about to be iterated by the sstables merge iterator. Regarding referencing sstables, the ticket you are referring to is probably CASSANDRA-11552 and the problem is clear now. I don't know how to reproduce it or where the bug exactly is yet, but I understand that if we call {{CFS.selectAndReference()}} rather than {{CFS.select()}} (because we no longer hold the {{OpOrder}}), then we might spin trying to reference sstables due to a bug that is causing sstables to be released when they are still visible. I will try and debug further if I can reproduce it. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296244#comment-15296244 ] Benedict commented on CASSANDRA-11521: -- Language is imprecise - I didn't mean a blanket ban on OpOrder, just that it should not be used for the long-life cursor's sstable access, only held during any active service of the query by the server. If we were to offer isolation, the memtable contents applicable to the query would have to be copied anyway (it could be referenced directly until flush, but given the current on/off heap hybrid situation that probably isn't desirable) , which actually might be a point against offering isolation. I don't have time to remind myself of the intricacies of the issue with referencing sstables, but I did see in passing a ticket that [~snazy] filed/fixed for suppressing a very spamming log message with respect to failing to reference sstables, and he may be able to point you to it. That's what I'm referring to - the log messages are the symptom (and were deliberately not made spam proof to make it obvious it was happening, since it's a bad thing). As far as I recall, there's a period during the replacement of sstables after compaction where the expired versions are visible and cannot possibly be referenced and so the selection loop spins. It's probably quite easily fixed, but you will have to investigate for yourself. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15293001#comment-15293001 ] Stefania commented on CASSANDRA-11521: -- Thank you for your input [~snazy] and [~benedict]. bq. However these streams can be arbitrarily large, so certainly we don't want to evaluate the entire query to permit releasing the sstables. Noted, I will keep only a small amount of buffers in memory. I kind of came to this conclusion after reading Robert's comment, the time-bound lifespan is an excellent idea, thank you for suggesting it. bq. Note, that the OpOrder should not be used by these queries - actual references should be taken so that long lifespans have no impact. You mean actual references to the sstables via something like CFS.selectAndReference? I don't understand how we can read from off-heap memtables without using OpOrder since their memory would be invalidated once they are flushed. Above I mentioned releasing sstables but I actually meant both sstables and memtables, do you think it's a bad idea to block memtable flushing? bq. The code that takes these references really needs to be fixed, also, so that the races to update the data tracker don't cause temporary "infinite" loops - like we see for range queries today. Sorry but I really cannot understand how PartitionRangeReadCommand.queryStorage may cause races when updating the data tracker view atomic reference (I presume this is what you meant). > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291428#comment-15291428 ] Benedict commented on CASSANDRA-11521: -- I would also like to voice my support for a separate path. The two needs are really quite distinct, and while optimising the normal read path is definitely something we should be exploring in general, complicating it with harder to reason about system behaviour on the normal path (wrt memory usage, reclaim, abort detection etc) _and_ implementation details (leading to bugs around those things, for more critical use cases), and yet still unlikely yielding the same performance suggests it isn't the best approach for this goal. However I would caveat that the idea of evaluating the entire query to an off-heap memory region is not what I would have in mind - there's a sliding scale starting from a small buffer (or pair of buffers) kept just ahead of the client, refilled from a persistent server-side cursor that just avoids repeating work to seek into files. The ideal would be as close to this as possible, with a potential time-bound on the lifespan of the cursor, after which it can be reinitialised to permit cleanup of sstables. A configurable time limit on isolation could be provided as an option to define this period. However these streams can be arbitrarily large, so certainly we don't want to evaluate the entire query to permit releasing the sstables. Note, that the OpOrder should not be used by these queries - actual references should be taken so that long lifespans have no impact. The code that takes these references really needs to be fixed, also, so that the races to update the data tracker don't cause temporary "infinite" loops - like we see for range queries today. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15289246#comment-15289246 ] Robert Stupp commented on CASSANDRA-11521: -- bq. pages are stored in Netty direct byte buffers Just to note, that direct buffers are also constrainted by {{-XX:MaxDirectMemorySize}}. This affects both message buffers (in Netty) as well as direct buffers allocated by C* itself. Putting too much pressure on this (basically exhausting the direct memory size limit) may result into OOMs as well. (I didn't look at the whole ticket - but this jumped out) > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288861#comment-15288861 ] Stefania commented on CASSANDRA-11521: -- h3. Benchmark results These are the original results of the benchmark defined in CASSANDRA-11542, measurements are in seconds: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.74|0.23|9.04|0.28| |parquet_df|2.02|0.82|4.86|0.50| |csv_rdd|11.36|1.99|10.14|0.64| |csv_df|13.17|0.45|15.93|1.61| |cassandra_rdd|*40.81*|0.80|*23.58*|0.53| |cassandra_rdd_stream|33.24|0.53|19.34|0.29| |cassandra_df|*26.07*|0.73|*16.75*|0.88| |cassandra_df_stream|19.39|2.18|13.19|1.71| And these are the results with the initial streaming proof of concept and client optimization patches applied: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.58|0.23|8.85|0.63| |parquet_df|2.69|2.23|4.94|0.27| |csv_rdd|10.70|0.43|11.04|1.00| |csv_df|14.02|1.01|14.75|0.43| |cassandra_rdd|*26.60*|2.50|*16.14*|0.28| |cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72| |cassandra_df|21.20|0.86|15.15|1.27| |cassandra_df_stream|13.04|0.87|11.18|0.54| These are the measurements with this ticket [patch|https://github.com/apache/cassandra/compare/trunk...stef1927:11521] applied: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|5.36|2.26|8.46|0.58| |parquet_df|1.36|0.32|4.79|0.61| |csv_rdd|9.61|1.01|10.10|0.59| |csv_df|12.51|0.50|14.31|0.38| |cassandra_rdd|*18.73*|0.68|*14.74*|0.92| |cassandra_rdd_stream|*17.50*|0.72|*13.55*|0.96| |cassandra_df|15.68|1.15|13.57|2.40| |cassandra_df_stream|14.73|0.87|13.00|3.05| Please refer to this [comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15238919=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15238919] and the following one for a description of the schema and RDD/DF test types. The streaming results show a slight degradation in performance compared to the proof of concept, because I've modified the streaming proof of concept to use the exact [same code|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R1] as the optimized paging approach. The only difference is that in one case we send each page immediately whist in the other case we store each page in a bounded queue and let the client take it when it is ready. I've also added a new pager that keeps the iterators open, this is used in both approaches. The bounded queue currently contains a maximum of 3 pages. If the queue is full the reader's thread is blocked but in the final patch we would have to interrupt and release resources if the client is too slow. Also, each page is pre-encoded into a Netty Byte Buffer so there isn't any additional GC overhead, but there is increased memory usage. Currently, this byte buffer is copied when the message is sent, in a final patch we could try to avoid this. h3. Unit tests results Further, below are the results of some [unit tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR1] that create a socket connection in process and retrieve full tables, measurements are in milliseconds: ||Part. size||Tot. rows in table||Num. clustering columns||Page size||Streaming||Optimized page-by-page|| |1KB|1000|1|100|3|3| |10 KB|1000|1|100|9|13| |64 KB|1000|1|100|57|81| |10 KB|10|100|5000|86|87| h3. Observations The worst degradation of performance of optimized paging vs. streaming (approximately 40%) is seen in the unit tests above, with large partitions and small page sizes. It should be noted that in the unit tests we retrieve the full table and we don't do much row processing, whilst in the benchmark we retrieve multiple token ranges in parallel, and there is significant row processing done client side. In the benchmark results, although there is a slight degradation in performance for optimized paging, it is my opinion that this is too close to the standard deviation to matter _at least right now_. [~tjake] raises very valid points: bq. I'm concerned this feature will cause a lot of heap pressure since it's basically subverting paging. If we added a global bulk request memory space perhaps OOM could be avoided that way (similar to our new page cache). As mentioned above, there is no heap pressure in terms of GC activity because the pages are stored in Netty direct byte buffers but the total memory used increases. We most likely need a mechanism to limit the total amount of memory used by these optimized queries, and to evict old pages that have not been claimed. bq. As for queuing pages, if you are always going to fetch up to N pages why not just make the page size N times larger for bulk requests? There is a limit of 256 MB on the message size
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15287116#comment-15287116 ] T Jake Luciani commented on CASSANDRA-11521: I'm concerned this feature will cause a lot of heap pressure since it's basically subverting paging. If we added a global bulk request memory space perhaps OOM could be avoided that way (similar to our new page cache). As for queuing pages, if you are always going to fetch up to N pages why not just make the page size N times larger for bulk requests? In order to detect the speed of the client you can use the {{Channel.isWritable}} to see if the client isn't able to keep up with the write watermark see https://issues.apache.org/jira/browse/CASSANDRA-11082 > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281320#comment-15281320 ] Stefania commented on CASSANDRA-11521: -- Just a quick update. I have a tentative implementation [here|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R49] based on a queue where a worker thread puts at most 3 pages and, if the queue stays full for a specified number of seconds, then the worker thread releases the resources and gives up. If the client keeps up, the worker continues to put pages on the queue without releasing resources (well the pagers still create a new partition iterator for each page but we can fix that later if it shows up in JFR). Unfortunately, the initial results from the [unit tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR75] show that pure streaming is still about 3 times faster. If I've read the profiles correctly, this is due to the fact that the encoding of a {{ResultSet}} into a Netty {{ByteBuffer}} is still done synchronously when the client requests a page. Therefore, I plan to encode the results directly into a Netty BB ant put this, not a {{ResultSet}}, in the queue. Once this is done, the difference between the two approaches should just be the client request messages and the user of the pagers. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250217#comment-15250217 ] Sylvain Lebresne commented on CASSANDRA-11521: -- bq. the protocol is very wasteful for the cases where you stream all the data While I agree that it's probably time to think about optimizing this further, I don't think it's specific to streaming so I'm in favor of just optimizing the format itself in general, and I've created CASSANDRA-11622 for that. I acknowledge that there may be some possible optimizations that would only provide gains when you're guaranteed to send large amount of data, but optimizing the format in general feels like a better first step in any case since it's more generally useful. bq. there is so much more we can do, in general, to make streaming faster, if we go for something purpose-built instead Making something purpose-built almost always allows for more optimization. But it also means more complexity, a completely new mechanism for driver authors and more code to maintain in general. I'm also not entirely convinced there is _that_ much it would allow over the "hint" idea (of course, how you value trade-offs between performance versus complexity is always somewhat subjective). In particular, I want to note that the "hint" would clearly mean that you intend to read it all and so we can still do a bunch of optimizations on that assumption. Like having those query not pollute our future user-space page cache, and maybe have the server start serializing at least one page in advance optimistically. I also want to note that reusing the paging mechanism gives us fail-over for pretty much free (as in, almost no additional work from drivers) which is nice. And adding cancellation (which I agree would be nice) is also pretty simple. Anyway, all this to say that I feel this "hint" idea would give a lot of the benefits for a lot less complexity (especially factoring the work required for all drivers). So while I'm curious to see some of the numbers Stefania is still working on, I (for what it's worth) really like the idea of starting with that simple idea and then focusing on other (non strictly protocol related) idea like CASSANDRA-11622 and CASSANDRA-11520. And only then re-evaluate if more complexity is justified/desirable. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235653#comment-15235653 ] Brian Hess commented on CASSANDRA-11521: - This is configurable, but the default for reads is LOCAL_ONE and on writes is LOCAL_QUORUM. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235605#comment-15235605 ] Aleksey Yeschenko commented on CASSANDRA-11521: --- bq. I had this remark a long time ago back in 2014 and people told me that thanks to network compression there is no much wasted bandwidth indeed. Not much wasted bandwidth, no. But a lot of wasted work on both ser and deser. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235601#comment-15235601 ] DOAN DuyHai commented on CASSANDRA-11521: - bq. with each row, we both repeat all the clustering columns - even if many rows share them - and the partition key columns. Could get rid of it, and all related redundant serialisation, if not building on top of ResultSet. I had this remark a long time ago back in 2014 and people told me that thanks to network compression there is no much wasted bandwidth indeed. What I had in mind back then was to send **raw** data to the driver and the driver will be responsible to de-serialize and re-format the data to have a proper _CQL row representation_ But it means putting a bunch of extra-logic and overhead on the client side, not sure the core team agrees on this point > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235496#comment-15235496 ] Aleksey Yeschenko commented on CASSANDRA-11521: --- [~brianmhess] Does C*-Spark integration use CL.LOCAL_ONE for reads? I know we do use QUORUM for writes, as a method for overload control. A small hint on top of regular {{SELECT}} is a decent first step, but there is so much more we can do, in general, to make streaming faster, if we go for something purpose-built instead (even if built on top of Native protocol) - with proper support from the driver. Among other things, the protocol is very wasteful for the cases where you stream all the data, especially if you have big partitions and a few clustering columns. While clustering column repetition as part of cell names is now fully gone from sstables and in-memory representation, in the protocol itself, with each row, we both repeat all the clustering columns - even if many rows share them - and the partition key columns. Could get rid of it, and all related redundant serialisation, if not building on top of ResultSet. Secondly, it's not common at all to multiplex a single session between transactional and analytical workloads. So a single Spark java driver session is going to only be dealing with streaming itself (maybe even only single stream at a time?). We could add a new command ({{STREAM}}), with query and, say, throughput limit, or maximum # of unacknowledged rows/bytes, and just server-side push as much as we can without violating the limits. The stream would be cancellable. Also, ideally, once we switch to the user-space page cache, these queries should not be polluting it. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234461#comment-15234461 ] Stefania commented on CASSANDRA-11521: -- I like this suggested approach; it has many advantages, as already pointed out above. I will prototype it and then compare performance with the existing "streaming" prototype. I have created CASSANDRA-11542, so we can actually compare with HDFS performance as well. Once this benchmark is available, and the new approach has been prototyped, we will then have 4 measurements: * HDFS * Cassandra trunk * Cassandra with current "streaming" approach * Cassandra with this new approach. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232771#comment-15232771 ] Brian Hess commented on CASSANDRA-11521: - Ah - that's a good point (about internal things for other CLs). > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232728#comment-15232728 ] Sylvain Lebresne commented on CASSANDRA-11521: -- bq. WRT CL, with this approach, I don't quite see why you would have to stick to CL_ONE here It's more a matter of CL.ONE being the case where we know we can get great benefits. Because in that case we'll "keep the query open", which save tons of work that is done for every page otherwise. For other CLs, because we asks other nodes, we'd kind of have to add some intra-node streaming of results to get substantial gains. And that's a lot more involved, hence the "that's an optimization for another day". > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232715#comment-15232715 ] Brian Hess commented on CASSANDRA-11521: - [~slebresne] - I like this new approach better. I think it simplifies things a bit and I'm worried about the server easily overpowering the client, which I think could be really easy to do (then we'd have to think about things like back-pressure, etc). There could be a way to tell the server that the client is going to ask for all (or a lot) of the pages, so keep this stuff ready to flow, etc. Additionally we could have a setting that will tell the server "if you haven't heard me ask for the next page (or given some heartbeat) in a X long, then feel free to clean things up and throw an error if I ask for the next page later", or something, so that we don't have resources tied up even if the client dies. WRT CL, with this approach, I don't quite see why you would have to stick to CL_ONE here. That said, starting with CL_ONE and "growing" to other CL's is probably okay. Just not entirely sure what it gains given this new approach. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232701#comment-15232701 ] Sylvain Lebresne commented on CASSANDRA-11521: -- The first thing that I think should be answered here is how do we "expose" this "externally". My initial though was more or less what I think your proof of concept is doing, that is having a different "paging mode" where the server sends pages "as fast as possible" rather than waiting for the client to ask for them. But I'm starting to wonder if that's the best approach. Because one of the question in that case is "how to make sure we don't overwhelm the client?". And taking a step back, I strongly suspect that by far the majority of the gain of "streaming" in the numbers on CASSANDRA-9259 is due to not having to re-start a new query server side for each page. Because other than that, the difference between clients requesting pages as-fast-as-they-can versus server sending them as-fast-as-they-can (without waiting on the client to ask) is really just the latency of 2 client-server messages per page, which should be fairly small (and probably not even noticeable if the server can send data faster than the client can process). So an alternative could be to not change how current paging works in general, but simply allow user to provide a "hint" when they know that they intend to consume the whole result set no matter what (and do so rapidly). That hint would be used by the driver and server to optimize based on that assumption, which would mean for the driver to try to ask all pages to the same replica and for the server to, at CL.ONE at least, maintain the ongoing query iterator in memory. My reasoning is that this would trade some hopefully negligable amount of latency between pages for: # a simple solution to the problem of rate limiting for clients sake (since client will still control how fast things come). # almost no change to the native protocol. We only need to pass the new "hint" flag, which would really only mean "please optimize if you can". In particular, we could actually introduce this _without_ a bump of the native protocol since we have flags available for query/execute messages. Given that so far we have no plan on doing the protocol v5 before 4.0, this would let us deliver this earlier which is nice. # very little changes for the drivers: all they probably have to do is make sure they reuse the same replica for all pages if the "hint" is set by users but that should be pretty trivial to implement. # it makes the question of what CL is supported moot: the "hint" flag will be just that, a hint, so users will be able to use it whenever. It just happens that we'll only optimize CL.ONE initially. Overall, assuming the loss in latency (compared to having the server sends page as fast as it can) is indeed very small (which we should certainly validate), this would appear to a pretty good tradeoff to me. But anyway, that's my initial brain dump on that first question of "how we expose this?". There are other questions too that needs to be discussed (and the sooner the better). For instance, how do we concretely handle the long running queries that this will allow? Holding an OpOrder for too long feels problematic to name just one problem. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231896#comment-15231896 ] Stefania commented on CASSANDRA-11521: -- bq. I'd like us to tackle this and CASSANDRA-11520 as independently as possible, since as much as they both are meant to help the same scenario, they are independent optimisations. In the proof of concept they are not independent, you can take a look at [this code|https://github.com/stef1927/cassandra/blob/9259/src/java/org/apache/cassandra/service/BulkReadService.java] when you get a chance. It should give you a very good idea of what I had in mind, we basically stream results without even stopping the iteration. I can see how they can be independent, and I'll be sure to share the design upfront if that's the path we choose, but wouldn't this mean that internally we still process each page independently, which means creating sstable iterators for every single page for example. Do you think the approach for CL.ONE that I chose in the proof of concept would be too problematic for resource management? > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231869#comment-15231869 ] Sylvain Lebresne commented on CASSANDRA-11521: -- I'd like us to tackle this and CASSANDRA-11520 as independently as possible, since as much as they both are meant to help the same scenario, they are independent optimisations. For this, it's not immediately clear to me that it'd help in any way to limit to CL.ONE and so I wouldn't do it unless there is good reason to. But that could boil down to us not having the same idea of how this should work in the end, or me missing some of the challenges, so we can discuss more precisely once you've provided more details on how you plan on tackling this. But please, do provide some reasonably precise design we can discuss upfront. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231845#comment-15231845 ] Stefania commented on CASSANDRA-11521: -- The initial implementation I have in mind is based on the special case for local reads at CL.ONE, CASSANDRA-11520. I was planning on modifying only this read path. We can add streaming to normal reads as well, that's true. I haven't considered it for this ticket because it was not part of the proof of concept, hence I am not sure how helpful it would be, but it can be done if there is sufficient interest for it. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231811#comment-15231811 ] Sylvain Lebresne commented on CASSANDRA-11521: -- bq. CL ONE only, at least in the initial implementation Is there any reason to have this limitation for that specific ticket? I mean, if restricting to CL.ONE makes this initially simpler then I'd be fine with that, but this specific issue seems completely orthogonal to the CL to me at face value so I'm curious where you think CL.ONE will make this easier. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231414#comment-15231414 ] Stefania commented on CASSANDRA-11521: -- CL ONE only, at least in the initial implementation. > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests
[ https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230346#comment-15230346 ] DOAN DuyHai commented on CASSANDRA-11521: - [~Stefania] For this use-case, does it mean that the client should stick to CL ONE or can it use CL > ONE ? > Implement streaming for bulk read requests > -- > > Key: CASSANDRA-11521 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11521 > Project: Cassandra > Issue Type: Sub-task > Components: Local Write-Read Paths >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > Allow clients to stream data from a C* host, bypassing the coordination layer > and eliminating the need to query individual pages one by one. -- This message was sent by Atlassian JIRA (v6.3.4#6332)