[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-08-09 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15413016#comment-15413016
 ] 

Stefania commented on CASSANDRA-11521:
--

Rebase completed, tests are fine:

||trunk|[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]|

I've also linked this new feature to version 5 of the native protocol and added 
support for it to the Java driver.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting, protocolv5
> Fix For: 3.x
>
> Attachments: final-patch-jfr-profiles-1.zip
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-08-08 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411398#comment-15411398
 ] 

Stefania commented on CASSANDRA-11521:
--

Thank you!

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting, protocolv5
> Fix For: 3.x
>
> Attachments: final-patch-jfr-profiles-1.zip
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-08-08 Thread DOAN DuyHai (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411391#comment-15411391
 ] 

DOAN DuyHai commented on CASSANDRA-11521:
-

Thanks Stefania, great works!




> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting, protocolv5
> Fix For: 3.x
>
> Attachments: final-patch-jfr-profiles-1.zip
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-29 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400395#comment-15400395
 ] 

Stefania commented on CASSANDRA-11521:
--

The patch is ready for review:

||trunk|[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]|

There are also the [driver 
patch|https://github.com/stef1927/java-driver/commits/11521] and the [spark 
connector 
patch|https://github.com/stef1927/spark-cassandra-connector/commits/11521]. For 
these I plan to create tickets for the respective projects once the native 
protocol changes have been finalized.

A [design 
document|https://docs.google.com/document/d/1YqKGSU1P8EJIfMrO--29VaSoCy5mUu-ePfAiIOLsY7o/edit]
 is also available.

The Spark benchmark results are available in [this 
comment|https://issues.apache.org/jira/browse/CASSANDRA-9259?focusedCommentId=15400394=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15400394]
 on the parent ticket. The final patch is slightly better than the 
proof-of-concept, and the asynchronous paging mechanism significantly 
outperforms the existing mechanism for large data sets.

I've also repeated some cstar_perf tests to rule out performance regressions 
with ordinary queries, which are not in the optimized path:

* Single partition queries (default cassandra-stress read command) at 
CL.LOCAL_ONE (the cassandra-stress default): [first 
run|http://cstar.datastax.com/graph?command=one_job=8b1f1d54-53e4-11e6-85af-0256e416528f=99th_latency=2_read=1_aggregates=true=0=276.98=0=22.33],
 [second run with swapped revision's 
order|http://cstar.datastax.com/graph?command=one_job=1abd3fe4-545e-11e6-8920-0256e416528f=op_rate=2_read=1_aggregates=true=0=277.86=0=243951.4],
 [an old 
run|http://cstar.datastax.com/graph?command=one_job=16cef080-53dc-11e6-b967-0256e416528f=op_rate=2_read=1_aggregates=true=0=282.92=0=249571.3]
 done before enabling token aware routing in cassandra stress.

* Single partition queries at CL.ALL: [unique 
run|http://cstar.datastax.com/graph?command=one_job=e2155410-5462-11e6-9cd7-0256e416528f=op_rate=2_read=1_aggregates=true=0=277.75=0=246123.9]

There is a gap of 3.6K ops/second without token aware routing and 1K with 
CL=ALL. With token aware routing the patch is instead 1K ops / second faster. 
These differences must arise from the refactoring in select statement. They are 
very small differences, the test error seems to be around 0.5K, but I can look 
into it further if there are concerns. 

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting, protocolv5
> Fix For: 3.x
>
> Attachments: final-patch-jfr-profiles-1.zip
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-15 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379166#comment-15379166
 ] 

Stefania commented on CASSANDRA-11521:
--

Here is quick update. Progress has been somewhat slow in the last week due to 
other tasks. Nonetheless, the unit and dtests are now passing. I've repeated 
the benchmark and unfortunately there is a performance degradation of about 25% 
compared to the proof of concept. I've started investigating this but I haven't 
identified the exact reason for it. I'm attaching the JFR profiles for client 
and server in case anyone is interested in taking a look: 
[^final-patch-jfr-profiles-1.zip].

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
> Attachments: final-patch-jfr-profiles-1.zip
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365643#comment-15365643
 ] 

Stefania commented on CASSANDRA-11521:
--

Adding links to patch and CI results, not yet ready for review but in case 
someone wants to take an early look:

|[patch|https://github.com/stef1927/cassandra/commits/11521]|[testall|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-testall/]|[dtest|http://cassci.datastax.com/view/Dev/view/stef1927/job/stef1927-11521-dtest/]|


> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365639#comment-15365639
 ] 

Stefania commented on CASSANDRA-11521:
--

Right. In this case it gets emulated over multiple pages, see the distributed 
case 
[here|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java#L450]
 vs. the local case 
[here|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java#L546].
 It's actually {{Selection.RowBuilder}}, implemented by 
[{{AsyncPagingService.PageBuilder}}|https://github.com/stef1927/cassandra/blob/11521/src/java/org/apache/cassandra/cql3/async/paging/AsyncPagingService.java#L132]
 that monitors pages and sends them to the client when they are available.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365637#comment-15365637
 ] 

Stefania commented on CASSANDRA-11521:
--

Yes it will be delivered together with this patch.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Jonathan Ellis (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365627#comment-15365627
 ] 

Jonathan Ellis commented on CASSANDRA-11521:


Excellent writeup, thanks!

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Jonathan Ellis (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365625#comment-15365625
 ] 

Jonathan Ellis commented on CASSANDRA-11521:


bq. The first thought was to have two new commands, a STREAM request and a 
STREAM response but I chose against this so that moving forward we could phase 
out the existing request-response mechanism, clients could simply use streaming 
for everything and we could set the maximum number of pages to one by default

But we still need the storageproxy path for CL > ONE, right?  So how do we get 
to a "streaming for everything" world?

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Jonathan Ellis (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365623#comment-15365623
 ] 

Jonathan Ellis commented on CASSANDRA-11521:


bq. Optimize local range reads at CL.ONE by keeping the iterators open across 
pages and avoiding the storage proxy layer

Does this subsume CASSANDRA-11520 then?

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365479#comment-15365479
 ] 

Stefania commented on CASSANDRA-11521:
--

We bypass StorageProxy for local range queries that use no index, at CL 1. The 
decision is taken by SelectStatement.

I've drafted a quick design doc 
[here|https://docs.google.com/document/d/1YqKGSU1P8EJIfMrO--29VaSoCy5mUu-ePfAiIOLsY7o/edit?usp=sharing].

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-07-06 Thread Jonathan Ellis (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15365172#comment-15365172
 ] 

Jonathan Ellis commented on CASSANDRA-11521:


If this bypasses the coordinator does that mean we're limited to unfiltered seq 
scan, or do we support all of SELECT still?

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-06-30 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358245#comment-15358245
 ] 

Stefania commented on CASSANDRA-11521:
--

Thank you!

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-06-30 Thread Adam Holmberg (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357172#comment-15357172
 ] 

Adam Holmberg commented on CASSANDRA-11521:
---

Thanks for the heads up. I created driver ticket 
[PYTHON-597|https://datastax-oss.atlassian.net/browse/PYTHON-597] for this.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
>  Labels: client-impacting
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-06-30 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356834#comment-15356834
 ] 

Stefania commented on CASSANDRA-11521:
--

Just a quick update since I haven't had a chance to update this ticket in over 
a month.

I have an almost final implementation of the patch, I need to fix a few more 
dtests, implement new unit tests to test some corner cases, and repeat the 
benchmark to rule out performance regressions.

I hope to have something ready for review in the next few weeks, depending on 
other tasks and the results of the benchmark and new unit tests.

The new delivery mechanism has been renamed from _streaming_ to _asynchronous 
paging_.

In terms of implementation, I ended up having something somewhat more generic 
than what originally stated. Specifically, optimized local reads will be 
available regardless of the delivery mechanism to the client (page-by-page or 
asynchronous paging). This means that CASSANDRA-11520 will be included in this 
patch. 

Also, asynchronous paging will be available for all queries at all consistency 
levels, not just optimized load queries. This does mean however that we will 
need to implement the protocol changes in the python driver as well, in order 
to write dtests, cc [~aholmber] for a heads up. 

When using the new asynchronous paging delivery mechanism, clients will be able 
to page by rows and by bytes. If paging by bytes, the page size returned to the 
client will be between:
* the requested size minus one avg row size 
* the requested size plus \[the current CQL row size - an avg row size\].

The avg row size is refined for every page sent. At CL > 1, intra-node, we are 
still sending requests using page sizes in rows, to calculate the number of 
rows we divide the page size in bytes by an average row size.

As for the optimized local reads, for now they are simply done in 
SelectStatement by releasing the op order periodically. Later, we may be able 
to do something fancier if required, such as using only sstable references, 
avoiding polluting the chunk cache, using a larger read ahead and so forth. For 
now, I wanted to focus on keeping delivery and local reads orthogonal and 
implementing the new delivery mechanism.

h3. Client protocol changes

A new flag has been added to query options:

{code}
+0x80: With extended flags. If set,  should be present.
+-  is a [byte] whose bits define additional options for 
this query and,
+  like , may influence the remainder of the message. Supported 
extended flags:
+0x01: With paging options. If set,  will be present 
and the
+  query result will be pushed to the client asynchronously, and 
according to
+  the paging options. Asynchronous paging can be interrupted via a 
CANCEL request.
+   contains the following:
+- , a [UUID] that uniquely identifies an asynchronous 
paging session.
+- , an [Int] indicating the size of each page in 
the unit defined below.
+  This is a mandatory parameter that takes precedence over 
,
+  which is obsoleted by this parameter.
+- , an [Int] indicating the page unit: 1 indicates 
bytes and 2 indicates rows.
+- , an [Int] indicating the maximum number of 
pages to
+  receive in total, set this to zero to indicate no limit.
+- , an [Int] indicating the maximum number 
of pages to receive
+  per second, set this to zero to indicate no limit.
+
{code}

A new cancel message has been added:

{code}
+4.1.9. CANCEL
+
+  Request to cancel an asynchronous operation. The body of the message is:
+  - an [int] identifying the operation type:
+  - 1 for cancelling an async paging session
+  - a [uuid] that contains the unique identifier of the operation to cancel.
+
{code}

A new flag has been added to the response metadata:

{code}
+0x0005With_asynchronous_paging: if set, this result is part of 
an asynchronous
+  paging session and the  will be present.
[...]
+-  contains:
+   - a [uuid] that uniquely identifies the asynchronous paging session
+   - an [int] that identifies the sequential number of this result in 
the session
+   - a [boolean] that is true for the last message in the session
{code}

The paging state will be returned as usual, so clients will be able to resume 
paging on any node using the old or new paging mechanism.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: 

[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-23 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297595#comment-15297595
 ] 

Stefania commented on CASSANDRA-11521:
--

Thank you for following up Benedict.

Regarding holding {{OpOrder}} and isolation, it would be nice to offer 
isolation at the partition level. I don't think we can offer total isolation if 
we release sstables periodically but, if we release them only at partition 
boundaries, then isolation at the partition level should be possible. To recap, 
one option is to copy the entire memtable sub-maps initially, but this 
increases memory used and we may hold partitions that are no longer relevant if 
in the meantime the memtable is flushed and gets picked up when we periodically 
refresh sstables. Another option is to copy a partition, or reference it but 
this is quite hard, or hold the OpOrder but only when a specific partition is 
about to be iterated by the sstables merge iterator.

Regarding referencing sstables, the ticket you are referring to is probably 
CASSANDRA-11552 and the problem is clear now. I don't know how to reproduce it 
or where the bug exactly is yet, but I understand that if we call 
{{CFS.selectAndReference()}} rather than {{CFS.select()}} (because we no longer 
hold the {{OpOrder}}), then we might spin trying to reference sstables due to a 
bug that is causing sstables to be released when they are still visible. I will 
try and debug further if I can reproduce it.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-23 Thread Benedict (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296244#comment-15296244
 ] 

Benedict commented on CASSANDRA-11521:
--

Language is imprecise - I didn't mean a blanket ban on OpOrder, just that it 
should not be used for the long-life cursor's sstable access, only held during 
any active service of the query by the server.  If we were to offer isolation, 
the memtable contents applicable to the query would have to be copied anyway 
(it could be referenced directly until flush, but given the current on/off heap 
hybrid situation that probably isn't desirable) , which actually might be a 
point against offering isolation.

I don't have time to remind myself of the intricacies of the issue with 
referencing sstables, but I did see in passing a ticket that [~snazy] 
filed/fixed for suppressing a very spamming log message with respect to failing 
to reference sstables, and he may be able to point you to it.  That's what I'm 
referring to - the log messages are the symptom (and were deliberately not made 
spam proof to make it obvious it was happening, since it's a bad thing).  As 
far as I recall, there's a period during the replacement of sstables after 
compaction where the expired versions are visible and cannot possibly be 
referenced and so the selection loop spins.  It's probably quite easily fixed, 
but you will have to investigate for yourself.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-20 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15293001#comment-15293001
 ] 

Stefania commented on CASSANDRA-11521:
--

Thank you for your input [~snazy] and [~benedict].

bq. However these streams can be arbitrarily large, so certainly we don't want 
to evaluate the entire query to permit releasing the sstables.

Noted, I will keep only a small amount of buffers in memory. I kind of came to 
this conclusion after reading Robert's comment, the time-bound lifespan is an 
excellent idea, thank you for suggesting it.

bq. Note, that the OpOrder should not be used by these queries - actual 
references should be taken so that long lifespans have no impact.

You mean actual references to the sstables via something like 
CFS.selectAndReference? I don't understand how we can read from off-heap 
memtables without using OpOrder since their memory would be invalidated once 
they are flushed. Above I mentioned releasing sstables but I actually meant 
both sstables and memtables, do you think it's a bad idea to block memtable 
flushing?

bq. The code that takes these references really needs to be fixed, also, so 
that the races to update the data tracker don't cause temporary "infinite" 
loops - like we see for range queries today.

Sorry but I really cannot understand how PartitionRangeReadCommand.queryStorage 
may cause races when updating the data tracker view atomic reference (I presume 
this is what you meant).





> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-19 Thread Benedict (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291428#comment-15291428
 ] 

Benedict commented on CASSANDRA-11521:
--

I would also like to voice my support for a separate path.  The two needs are 
really quite distinct, and while optimising the normal read path is definitely 
something we should be exploring in general, complicating it with harder to 
reason about system behaviour on the normal path (wrt memory usage, reclaim, 
abort detection etc) _and_ implementation details (leading to bugs around those 
things, for more critical use cases), and yet still unlikely yielding the same 
performance suggests it isn't the best approach for this goal. 

However I would caveat that the idea of evaluating the entire query to an 
off-heap memory region is not what I would have in mind - there's a sliding 
scale starting from a small buffer (or pair of buffers) kept just ahead of the 
client, refilled from a persistent server-side cursor that just avoids 
repeating work to seek into files.  The ideal would be as close to this as 
possible, with a potential time-bound on the lifespan of the cursor, after 
which it can be reinitialised to permit cleanup of sstables.  A configurable 
time limit on isolation could be provided as an option to define this period.

However these streams can be arbitrarily large, so certainly we don't want to 
evaluate the entire query to permit releasing the sstables.

Note, that the OpOrder should not be used by these queries - actual references 
should be taken so that long lifespans have no impact.

The code that takes these references really needs to be fixed, also, so that 
the races to update the data tracker don't cause temporary "infinite" loops - 
like we see for range queries today.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-18 Thread Robert Stupp (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15289246#comment-15289246
 ] 

Robert Stupp commented on CASSANDRA-11521:
--

bq. pages are stored in Netty direct byte buffers

Just to note, that direct buffers are also constrainted by 
{{-XX:MaxDirectMemorySize}}. This affects both message buffers (in Netty) as 
well as direct buffers allocated by C* itself. Putting too much pressure on 
this (basically exhausting the direct memory size limit) may result into OOMs 
as well.

(I didn't look at the whole ticket - but this jumped out)

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-18 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288861#comment-15288861
 ] 

Stefania commented on CASSANDRA-11521:
--

h3. Benchmark results

These are the original results of the benchmark defined in CASSANDRA-11542, 
measurements are in seconds:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.74|0.23|9.04|0.28|
|parquet_df|2.02|0.82|4.86|0.50|
|csv_rdd|11.36|1.99|10.14|0.64|
|csv_df|13.17|0.45|15.93|1.61|
|cassandra_rdd|*40.81*|0.80|*23.58*|0.53|
|cassandra_rdd_stream|33.24|0.53|19.34|0.29|
|cassandra_df|*26.07*|0.73|*16.75*|0.88|
|cassandra_df_stream|19.39|2.18|13.19|1.71|

And these are the results with the initial streaming proof of concept and 
client optimization patches applied:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.58|0.23|8.85|0.63|
|parquet_df|2.69|2.23|4.94|0.27|
|csv_rdd|10.70|0.43|11.04|1.00|
|csv_df|14.02|1.01|14.75|0.43|
|cassandra_rdd|*26.60*|2.50|*16.14*|0.28|
|cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72|
|cassandra_df|21.20|0.86|15.15|1.27|
|cassandra_df_stream|13.04|0.87|11.18|0.54|

These are the measurements with this ticket 
[patch|https://github.com/apache/cassandra/compare/trunk...stef1927:11521] 
applied:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|5.36|2.26|8.46|0.58|
|parquet_df|1.36|0.32|4.79|0.61|
|csv_rdd|9.61|1.01|10.10|0.59|
|csv_df|12.51|0.50|14.31|0.38|
|cassandra_rdd|*18.73*|0.68|*14.74*|0.92|
|cassandra_rdd_stream|*17.50*|0.72|*13.55*|0.96|
|cassandra_df|15.68|1.15|13.57|2.40|
|cassandra_df_stream|14.73|0.87|13.00|3.05|

Please refer to this 
[comment|https://issues.apache.org/jira/browse/CASSANDRA-11542?focusedCommentId=15238919=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15238919]
 and the following one for a description of the schema and RDD/DF test types.

The streaming results show a slight degradation in performance compared to the 
proof of concept, because I've modified the streaming proof of concept to use 
the exact [same 
code|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R1]
 as the optimized paging approach. The only difference is that in one case we 
send each page immediately whist in the other case we store each page in a 
bounded queue and let the client take it when it is ready. 

I've also added a new pager that keeps the iterators open, this is used in both 
approaches. The bounded queue currently contains a maximum of 3 pages. If the 
queue is full the reader's thread is blocked but in the final patch we would 
have to interrupt and release resources if the client is too slow. Also, each 
page is pre-encoded into a Netty Byte Buffer so there isn't any additional GC 
overhead, but there is increased memory usage. Currently, this byte buffer is 
copied when the message is sent, in a final patch we could try to avoid this.

h3. Unit tests results

Further, below are the results of some [unit 
tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR1]
 that create a socket connection in process and retrieve full tables, 
measurements are in milliseconds:

||Part. size||Tot. rows in table||Num. clustering columns||Page 
size||Streaming||Optimized page-by-page||
|1KB|1000|1|100|3|3|
|10 KB|1000|1|100|9|13|
|64 KB|1000|1|100|57|81|
|10 KB|10|100|5000|86|87|

h3. Observations

The worst degradation of performance of optimized paging vs. streaming 
(approximately 40%) is seen in the unit tests above, with large partitions and 
small page sizes. It should be noted that in the unit tests we retrieve the 
full table and we don't do much row processing, whilst in the benchmark we 
retrieve multiple token ranges in parallel, and there is significant row 
processing done client side.

In the benchmark results, although there is a slight degradation in performance 
for optimized paging, it is my opinion that this is too close to the standard 
deviation to matter _at least right now_.

[~tjake] raises very valid points:

bq. I'm concerned this feature will cause a lot of heap pressure since it's 
basically subverting paging. If we added a global bulk request memory space 
perhaps OOM could be avoided that way (similar to our new page cache).

As mentioned above, there is no heap pressure in terms of GC activity because 
the pages are stored in Netty direct byte buffers but the total memory used 
increases. We most likely need a mechanism to limit the total amount of memory 
used by these optimized queries, and to evict old pages that have not been 
claimed.

bq. As for queuing pages, if you are always going to fetch up to N pages why 
not just make the page size N times larger for bulk requests?

There is a limit of 256 MB on the message size 

[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-17 Thread T Jake Luciani (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15287116#comment-15287116
 ] 

T Jake Luciani commented on CASSANDRA-11521:


I'm concerned this feature will cause a lot of heap pressure since it's 
basically subverting paging.  If we added a global bulk request memory space 
perhaps OOM could be avoided that way (similar to our new page cache).  

As for queuing pages, if you are always going to fetch up to N pages why not 
just make the page size N times larger for bulk requests?

In order to detect the speed of the client you can use the 
{{Channel.isWritable}} to see if the client isn't able to keep up with the 
write watermark see https://issues.apache.org/jira/browse/CASSANDRA-11082

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-05-12 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281320#comment-15281320
 ] 

Stefania commented on CASSANDRA-11521:
--

Just a quick update. 

I have a tentative implementation 
[here|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-468c74b80d7a1d5b21948217659af747R49]
 based on a queue where a worker thread puts at most 3 pages and, if the queue 
stays full for a specified number of seconds, then the worker thread releases 
the resources and gives up. If the client keeps up, the worker continues to put 
pages on the queue without releasing resources (well the pagers still create a 
new partition iterator for each page but we can fix that later if it shows up 
in JFR).

Unfortunately, the initial results from the [unit 
tests|https://github.com/apache/cassandra/compare/trunk...stef1927:11521#diff-04e8835163e2a326515d61f448a8ebbcR75]
 show that pure streaming is still about 3 times faster.

If I've read the profiles correctly, this is due to the fact that the encoding 
of a {{ResultSet}} into a Netty {{ByteBuffer}} is still done synchronously when 
the client requests a page. Therefore, I plan to encode the results directly 
into a Netty BB ant put this, not a {{ResultSet}}, in the queue. Once this is 
done, the difference between the two approaches should just be the client 
request messages and the user of the pagers.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-20 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250217#comment-15250217
 ] 

Sylvain Lebresne commented on CASSANDRA-11521:
--

bq. the protocol is very wasteful for the cases where you stream all the data

While I agree that it's probably time to think about optimizing this further, I 
don't think it's specific to streaming so I'm in favor of just optimizing the 
format itself in general, and I've created CASSANDRA-11622 for that. I 
acknowledge that there may be some possible optimizations that would only 
provide gains when you're guaranteed to send large amount of data, but 
optimizing the format in general feels like a better first step in any case 
since it's more generally useful.

bq. there is so much more we can do, in general, to make streaming faster, if 
we go for something purpose-built instead

Making something purpose-built almost always allows for more optimization. But 
it also means more complexity, a completely new mechanism for driver authors 
and more code to maintain in general. I'm also not entirely convinced there is 
_that_ much it would allow over the "hint" idea (of course, how you value 
trade-offs between performance versus complexity is always somewhat 
subjective). In particular, I want to note that the "hint" would clearly mean 
that you intend to read it all and so we can still do a bunch of optimizations 
on that assumption. Like having those query not pollute our future user-space 
page cache, and maybe have the server start serializing at least one page in 
advance optimistically.

I also want to note that reusing the paging mechanism gives us fail-over for 
pretty much free (as in, almost no additional work from drivers) which is nice. 
And adding cancellation (which I agree would be nice) is also pretty simple.

Anyway, all this to say that I feel this "hint" idea would give a lot of the 
benefits for a lot less complexity (especially factoring the work required for 
all drivers). So while I'm curious to see some of the numbers Stefania is still 
working on, I (for what it's worth) really like the idea of starting with that 
simple idea and then focusing on other (non strictly protocol related) idea 
like CASSANDRA-11622 and CASSANDRA-11520. And only then re-evaluate if more 
complexity is justified/desirable.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-11 Thread Brian Hess (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235653#comment-15235653
 ] 

 Brian Hess commented on CASSANDRA-11521:
-

This is configurable, but the default for reads is LOCAL_ONE and on writes is 
LOCAL_QUORUM. 

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-11 Thread Aleksey Yeschenko (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235605#comment-15235605
 ] 

Aleksey Yeschenko commented on CASSANDRA-11521:
---

bq. I had this remark a long time ago back in 2014 and people told me that 
thanks to network compression there is no much wasted bandwidth indeed.

Not much wasted bandwidth, no. But a lot of wasted work on both ser and deser.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-11 Thread DOAN DuyHai (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235601#comment-15235601
 ] 

DOAN DuyHai commented on CASSANDRA-11521:
-

bq. with each row, we both repeat all the clustering columns - even if many 
rows share them - and the partition key columns. Could get rid of it, and all 
related redundant serialisation, if not building on top of ResultSet.

I had this remark a long time ago back in 2014 and people told me that thanks 
to network compression there is no much wasted bandwidth indeed.

What I had in mind back then was to send **raw** data to the driver and the 
driver will be responsible to de-serialize and re-format the data to have a 
proper _CQL row representation_

But it means putting a bunch of extra-logic and overhead on the client side, 
not sure the core team agrees on this point

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-11 Thread Aleksey Yeschenko (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235496#comment-15235496
 ] 

Aleksey Yeschenko commented on CASSANDRA-11521:
---

[~brianmhess] Does C*-Spark integration use CL.LOCAL_ONE for reads? I know we 
do use QUORUM for writes, as a method for overload control.

A small hint on top of regular {{SELECT}} is a decent first step, but there is 
so much more we can do, in general, to make streaming faster, if we go for 
something purpose-built instead (even if built on top of Native protocol) - 
with proper support from the driver.

Among other things, the protocol is very wasteful for the cases where you 
stream all the data, especially if you have big partitions and a few clustering 
columns. While clustering column repetition as part of cell names is now fully 
gone from sstables and in-memory representation, in the protocol itself, with 
each row, we both repeat all the clustering columns - even if many rows share 
them - and the partition key columns. Could get rid of it, and all related 
redundant serialisation, if not building on top of ResultSet.

Secondly, it's not common at all to multiplex a single session between 
transactional and analytical workloads. So a single Spark java driver session 
is going to only be dealing with streaming itself (maybe even only single 
stream at a time?). We could add a new command ({{STREAM}}), with query and, 
say, throughput limit, or maximum # of unacknowledged rows/bytes, and just 
server-side push as much as we can without violating the limits. The stream 
would be cancellable.

Also, ideally, once we switch to the user-space page cache, these queries 
should not be polluting it.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-10 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234461#comment-15234461
 ] 

Stefania commented on CASSANDRA-11521:
--

I like this suggested approach; it has many advantages, as already pointed out 
above. I will prototype it and then compare performance with the existing 
"streaming" prototype. 

I have created CASSANDRA-11542, so we can actually compare with HDFS 
performance as well. Once this benchmark is available, and the new approach has 
been prototyped, we will then have 4 measurements: 

* HDFS
* Cassandra trunk
* Cassandra with current "streaming" approach
* Cassandra with this new approach.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Brian Hess (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232771#comment-15232771
 ] 

 Brian Hess commented on CASSANDRA-11521:
-

Ah - that's a good point (about internal things for other CLs).

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232728#comment-15232728
 ] 

Sylvain Lebresne commented on CASSANDRA-11521:
--

bq. WRT CL, with this approach, I don't quite see why you would have to stick 
to CL_ONE here

It's more a matter of CL.ONE being the case where we know we can get great 
benefits. Because in that case we'll "keep the query open", which save tons of 
work that is done for every page otherwise. For other CLs, because we asks 
other nodes, we'd kind of have to add some intra-node streaming of results to 
get substantial gains. And that's a lot more involved, hence the "that's an 
optimization for another day".

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Brian Hess (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232715#comment-15232715
 ] 

 Brian Hess commented on CASSANDRA-11521:
-

[~slebresne] - I like this new approach better.  I think it simplifies things a 
bit and I'm worried about the server easily overpowering the client, which I 
think could be really easy to do (then we'd have to think about things like 
back-pressure, etc).  There could be a way to tell the server that the client 
is going to ask for all (or a lot) of the pages, so keep this stuff ready to 
flow, etc. Additionally we could have a setting that will tell the server "if 
you haven't heard me ask for the next page (or given some heartbeat) in a X 
long, then feel free to clean things up and throw an error if I ask for the 
next page later", or something, so that we don't have resources tied up even if 
the client dies.

WRT CL, with this approach, I don't quite see why you would have to stick to 
CL_ONE here.  That said, starting with CL_ONE and "growing" to other CL's is 
probably okay.  Just not entirely sure what it gains given this new approach.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232701#comment-15232701
 ] 

Sylvain Lebresne commented on CASSANDRA-11521:
--

The first thing that I think should be answered here is how do we "expose" this 
"externally". My initial though was more or less what I think your proof of 
concept is doing, that is having a different "paging mode" where the server 
sends pages "as fast as possible" rather than waiting for the client to ask for 
them.

But I'm starting to wonder if that's the best approach. Because one of the 
question in that case is "how to make sure we don't overwhelm the client?". And 
taking a step back, I strongly suspect that by far the majority of the gain of 
"streaming" in the numbers on CASSANDRA-9259 is due to not having to re-start a 
new query server side for each page. Because other than that, the difference 
between clients requesting pages as-fast-as-they-can versus server sending them 
as-fast-as-they-can (without waiting on the client to ask) is really just the 
latency of 2 client-server messages per page, which should be fairly small (and 
probably not even noticeable if the server can send data faster than the client 
can process).

So an alternative could be to not change how current paging works in general, 
but simply allow user to provide a "hint" when they know that they intend to 
consume the whole result set no matter what (and do so rapidly).  That hint 
would be used by the driver and server to optimize based on that assumption, 
which would mean for the driver to try to ask all pages to the same replica and 
for the server to, at CL.ONE at least, maintain the ongoing query iterator in 
memory.

My reasoning is that this would trade some hopefully negligable amount of 
latency between pages for:
# a simple solution to the problem of rate limiting for clients sake (since 
client will still control how fast things come).
# almost no change to the native protocol. We only need to pass the new "hint" 
flag, which would really only mean "please optimize if you can". In particular, 
we could actually introduce this _without_ a bump of the native protocol since 
we have flags available for query/execute messages. Given that so far we have 
no plan on doing the protocol v5 before 4.0, this would let us deliver this 
earlier which is nice.
# very little changes for the drivers: all they probably have to do is make 
sure they reuse the same replica for all pages if the "hint" is set by users 
but that should be pretty trivial to implement.
# it makes the question of what CL is supported moot: the "hint" flag will be 
just that, a hint, so users will be able to use it whenever. It just happens 
that we'll only optimize CL.ONE initially.
Overall, assuming the loss in latency (compared to having the server sends page 
as fast as it can) is indeed very small (which we should certainly validate), 
this would appear to a pretty good tradeoff to me.

But anyway, that's my initial brain dump on that first question of "how we 
expose this?". There are other questions too that needs to be discussed (and 
the sooner the better). For instance, how do we concretely handle the long 
running queries that this will allow? Holding an OpOrder for too long feels 
problematic to name just one problem.


> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231896#comment-15231896
 ] 

Stefania commented on CASSANDRA-11521:
--

bq. I'd like us to tackle this and CASSANDRA-11520 as independently as 
possible, since as much as they both are meant to help the same scenario, they 
are independent optimisations.

In the proof of concept they are not independent, you can take a look at [this 
code|https://github.com/stef1927/cassandra/blob/9259/src/java/org/apache/cassandra/service/BulkReadService.java]
 when you get a chance. It should give you a very good idea of what I had in 
mind, we basically stream results without even stopping the iteration.

I can see how they can be independent, and I'll be sure to share the design 
upfront if that's the path we choose, but wouldn't this mean that internally we 
still process each page independently, which means creating sstable iterators 
for every single page for example. 

Do you think the approach for CL.ONE that I chose in the proof of concept would 
be too problematic for resource management?

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231869#comment-15231869
 ] 

Sylvain Lebresne commented on CASSANDRA-11521:
--

I'd like us to tackle this and CASSANDRA-11520 as independently as possible, 
since as much as they both are meant to help the same scenario, they are 
independent optimisations. For this, it's not immediately clear to me that it'd 
help in any way to limit to CL.ONE and so I wouldn't do it unless there is good 
reason to. But that could boil down to us not having the same idea of how this 
should work in the end, or me missing some of the challenges, so we can discuss 
more precisely once you've provided more details on how you plan on tackling 
this. But please, do provide some reasonably precise design we can discuss 
upfront.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231845#comment-15231845
 ] 

Stefania commented on CASSANDRA-11521:
--

The initial implementation I have in mind is based on the special case for 
local reads at CL.ONE, CASSANDRA-11520. I was planning on modifying only this 
read path.

We can add streaming to normal reads as well, that's true. I haven't considered 
it for this ticket because it was not part of the proof of concept, hence I am 
not sure how helpful it would be, but it can be done if there is sufficient 
interest for it.






> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-08 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231811#comment-15231811
 ] 

Sylvain Lebresne commented on CASSANDRA-11521:
--

bq. CL ONE only, at least in the initial implementation

Is there any reason to have this limitation for that specific ticket? I mean, 
if restricting to CL.ONE makes this initially simpler then I'd be fine with 
that, but this specific issue seems completely orthogonal to the CL to me at 
face value so I'm curious where you think CL.ONE will make this easier.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-07 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15231414#comment-15231414
 ] 

Stefania commented on CASSANDRA-11521:
--

CL ONE only, at least in the initial implementation.

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11521) Implement streaming for bulk read requests

2016-04-07 Thread DOAN DuyHai (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15230346#comment-15230346
 ] 

DOAN DuyHai commented on CASSANDRA-11521:
-

[~Stefania] For this use-case, does it mean that the client should stick to CL 
ONE or can it use CL > ONE ?

> Implement streaming for bulk read requests
> --
>
> Key: CASSANDRA-11521
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11521
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Local Write-Read Paths
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> Allow clients to stream data from a C* host, bypassing the coordination layer 
> and eliminating the need to query individual pages one by one.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)