[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-25 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299675#comment-15299675
 ] 

Stefania commented on CASSANDRA-11542:
--

Thank you for the review. I will increase the number of rows from 15M to at 
least 50M next time I re-run the benchmark. I think we can close this ticket 
for now.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, spark-load-perf-results-003.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-24 Thread Russell Alexander Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299326#comment-15299326
 ] 

Russell Alexander Spitzer commented on CASSANDRA-11542:
---

The benchmark looks good to me. I would only suggest you increase the volume of 
data in the run so that the ratio of pulling data from C* to setting up Spark 
work is lower.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, spark-load-perf-results-003.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-19 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290765#comment-15290765
 ] 

Stefania commented on CASSANDRA-11542:
--

Great, thank you! Let's continue the discussion on the connector changes there.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, spark-load-perf-results-003.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-19 Thread Artem Aliev (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290743#comment-15290743
 ] 

Artem Aliev commented on CASSANDRA-11542:
-

[~Stefania], I have created 
https://datastax-oss.atlassian.net/browse/SPARKC-383 for your connector finding.
If you have new code on finding, please comments there. I will discuss the 
design go the change with the team and improve you proposal.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, spark-load-perf-results-003.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-06 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273884#comment-15273884
 ] 

Stefania commented on CASSANDRA-11542:
--

I've fixed the benchmark to ensure all jobs have a similar number of spark 
tasks. This was accomplished by reducing the number of HDFS blocks and 
decreasing the Cassandra split size in the connector. All jobs now have 
approximately 35 Spark tasks, of which 10 execute in parallel (5 nodes with 2 
executors each).

Here are the results for schema 1 and 3:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.74|0.23|9.04|0.28|
|parquet_df|2.02|0.82|4.86|0.50|
|csv_rdd|11.36|1.99|10.14|0.64|
|csv_df|13.17|0.45|15.93|1.61|
|cassandra_rdd|*40.81*|0.80|*23.58*|0.53|
|cassandra_rdd_stream|33.24|0.53|19.34|0.29|
|cassandra_df|*26.07*|0.73|*16.75*|0.88|
|cassandra_df_stream|19.39|2.18|13.19|1.71|

As expected, the Cassandra numbers have improved significantly due to the 
increased parallelism. 

I've also performed some client side optimizations that followed by analyzing 
the JFR files recorded during the last run, the results are as follows:

||15M rows||SCHEMA 1|| ||SCHEMA 3|| ||
||Test||Time||Std. Dev||Time||Std. Dev||
|parquet_rdd|4.58|0.23|8.85|0.63|
|parquet_df|2.69|2.23|4.94|0.27|
|csv_rdd|10.70|0.43|11.04|1.00|
|csv_df|14.02|1.01|14.75|0.43|
|cassandra_rdd|26.60|2.50|16.14|0.28|
|cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72|
|cassandra_df|21.20|0.86|15.15|1.27|
|cassandra_df_stream|*13.04*|0.87|*11.18*|0.54|

When both streaming and client optimizations are in place, the performance is 
considerably better. The overall percentage improvements by test and schema are 
as follows:

||Percentage reduction in time after optimization||SCHEMA 1|| ||SCHEMA 3|| ||
|| ||RDD||DF||RDD||DF||
|Total|61.01%|49.98%|44.62%|33.25%|
|Client optimizations|52.13%|32.75%|32.48%|15.27%|
|Streaming (after client optimizations)|40.17%|38.51%|19.10%|26.22%|

Raw result data is attached as [^spark-load-perf-results-003.zip].

The client improvements are available 
[here|https://github.com/stef1927/spark-cassandra-connector/commit/1490178b9c166dc9e6c38f63be5eb7232e73ddd8],
 this is a quick summary:

* The cache of type decoders in the driver is extremely slow, saving the 
decoders in an array for each RDD computation is by far the most significant 
factor
* {{GettableData}} is creating a map, 
[{{_indexOf}}|https://github.com/stef1927/spark-cassandra-connector/blob/9259/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala#L16],
 for each single row. However, this map does not change across rows. In the 
optimizations I've replaced {{CassandraRow}} with {{GettableByIndexData}} for 
case classes but this needs further work.
* For the non streaming case, the query may not necessarily end up to a replica 
(token aware routing only works with partition keys, not with tokens and only 
the first set of tasks end up to the preferred Spark location, following tasks 
may not).
* The Spark metrics have been disabled as they contributed somewhat to the 
total decoding time, we need to calculate the binary row size without iterating 
the row column BBs again.
* Scala for loops are implemented as {{range.foreach}} and the closure passed 
to the foreach is not inlined as far as I understand so I think we need to be 
careful with for loops in the critical path. They definitely show up quite high 
in JFR, but I cannot say that replacing a single for loop does cause an impact 
in the total final time.

Further client size optimization is possible, I would suggest integrating the 
connector with the driver at a very low level in order to convert 
{{ByteBuffers}} directly into Scala types. In addition, the improvements I've 
already implemented will need further refinment. However, in the JFR recordings 
taken after these optimizations, the Spark executors now spend 66% of the time 
waiting and 33% of the time processing rows, whilst the NIO workers spend 95% 
of their time waiting. Therefore I suggest moving on to server side 
optimizations, starting with CASSANDRA-11521.

This [benchmark|https://github.com/stef1927/spark-load-perf] is ready for 
review as it will be used to measure any future improvement. [~rspitzer] would 
you be able to quickly review it or find someone interested?

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip, 

[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-05-02 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15266239#comment-15266239
 ] 

Stefania commented on CASSANDRA-11542:
--

bq. Saw that we are always doing the conversion to CassandraRow with RDDs, 
dataframes go directly to the internal SQL Type. 

In the dataframe tests we also only retrieve the two columns for the 
calculation rather than all columns . I described this above, sorry if it 
wasn't clear.

bq.The code you presented looks good to me, there is the potential issue of 
blocking on resultsets that take a long time to complete while other 
result-sets are already on the driver but i'm not sure if this is a big deal. 
Do you have any idea of the parallelization in these test? How many partitions 
are the different runs generating?

Thanks for checking the code. The result set futures should not block because 
the driver completes them as soon as they are transferred to the iterator's 
thread. I'm actually using futures as a lazy way to also transfer error 
conditions rather than just results.

In terms of parallelism, each C* node receives 256 token range queries per RDD 
iteration. This should be fine since each node has 256 tokens. I've also 
checked the spark tasks by connecting to the web UI at port 4040 and initially 
I could see 10 tasks per Cassandra RDD operation, then they increased to 20 
when I increased the number of executor cores to 4. I have 5 nodes with 2 
executors each so the initial number 10 makes sense as by default there is one 
core per executor, however I don't understand why I ended up with 20 rather 
than 40 when I increased the number of cores to 4. {{spark-env.sh}} is 
[here|https://github.com/stef1927/spark-load-perf/blob/master/bin/install_spark.sh#L34]
 if you want to check it out but there's not much to it other than the number 
of executor cores. I also note that the CSV and Parquet RDD operations have as 
many tasks as there are HDFS partitions, so 1000 tasks. This would give them a 
big advantage if we have cores idle but I don't know how to reliably increase 
tasks for C* RDDs.

I've collected JFR files for both Cassandra and the Spark executors: 
[^jfr_recordings.zip]. I still need to analyze them but from a quick look there 
are at least two interesting things client side (plus maybe a third one): we 
seem to spend a lot of time in {{CassandraRow._indexOfOrThrow()}} and in 
selecting the codecs in the driver. As for the C* JFR recorder, we spend 80% in 
the new bulk read code but we also still spend 15% of time in 
{{ReadCommandVerbHandler}}, which I don't understand.

I will post another update when I have more details on the JFR analysis and any 
optimizations that might follow.


> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-29 Thread vincent.poncet (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264867#comment-15264867
 ] 

vincent.poncet commented on CASSANDRA-11542:


Cassandra tests have a way higher normalized std deviation (std dev / mean) 
than hdfs tests. and looking in you previous tests, first run of cassandra is 
way way higher than next runs. So, it really looks like there is a cache 
somewhere which is biaising your cassandra numbers. 

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-29 Thread Russell Alexander Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264698#comment-15264698
 ] 

Russell Alexander Spitzer commented on CASSANDRA-11542:
---

Hmm i'm a little confused that case classes don't help but Dataframes do... The 
code you presented looks good to me, there is the potential issue of blocking 
on resultsets that take a long time to complete while other result-sets are 
already on the driver but i'm not sure if this is a big deal. Do you have any 
idea of the parallelization in these test? How many partitions are the 
different runs generating?

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-29 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263830#comment-15263830
 ] 

Stefania commented on CASSANDRA-11542:
--

Thank you for the suggestions [~rspitzer]. Switching to case classes for the 
RDD tests has actually made things much worse for streaming. 

For example for schema 1:

||Test||Time||Std. Dev||
| parquet_rdd|2.67|0.50|
|  parquet_df|2.69|0.66|
| csv_rdd|5.20|0.58|
|  csv_df|   12.37|0.53|
|   cassandra_rdd|   49.03|5.46|
|cassandra_rdd_stream|   47.49|3.84|
|cassandra_df|   27.78|0.74|
| cassandra_df_stream|   19.35|1.51|

I've also fixed a bug with the benchmark: previously sstables were only 
compacted and flushed on the spark master, not on all nodes. Further, the OS 
page cache might also not necessarily have been flushed on all nodes. Here is a 
repeat of data for schema 1 with this problem addressed:

||Test||Time||Std. Dev||
| parquet_rdd|4.81|0.31|
|  parquet_df|4.77|0.78|
| csv_rdd|7.59|0.22|
|  csv_df|   13.09|0.41|
|   cassandra_rdd|   45.13|0.55|
|cassandra_rdd_stream|   41.64|0.37|
|cassandra_df|   36.15|   11.70|
| cassandra_df_stream|   22.55|3.36|

In terms of IO usage, dstat shows less than 10MB per sec for Cassandra compared 
to 110 MB for HDFS.  

Regarding the degradation of RDD streaming with case classes, I suspect the way 
I implemented the RDD iterator might be inefficient, could you take a look 
[here|https://github.com/datastax/spark-cassandra-connector/compare/master...stef1927:9259#diff-fecec01aca0cc6ed91526423b292eceaR12]?
 We receive an iterator of futures from the driver, one per page. As soon as 
the future completes we return the rows, which then get converted to case 
classes. I think that somehow we should convert page rows in parallel to speed 
things up and take advantage of streaming.

[~slebresne] : noted, thank you! I should be able to start CASSANDRA-11521 next 
week, and continue this investigation in parallel. 


> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-29 Thread Sylvain Lebresne (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263751#comment-15263751
 ] 

Sylvain Lebresne commented on CASSANDRA-11542:
--

bq. I still think we need CASSANDRA-11520 and CASSANDRA-11521, but I just want 
to make sure we tackle the bigger "bang for the buck" first.

Just to say that I generally agree with that statement, but adding that 60% 
improvement (or even 30% for RDD) is not too shabby either and both 
CASSANDRA-11520 and CASSANDRA-11521 seems to be doable without too much 
effort/disruption. That is, as far as I'm concerned, you've gathered enough 
evidence to show that they are worth doing and I'd be happy to start there. 
This not excluding to continue digging in parallel of course. Anyway, great 
work so far, thanks.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-28 Thread Russell Alexander Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263561#comment-15263561
 ] 

Russell Alexander Spitzer commented on CASSANDRA-11542:
---

One other thing, when we were testing before we noticed that our Reads were not 
IO bound, with Streaming are we now IO bound on the C* side? IE is drive usage 
at full?

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-28 Thread Russell Alexander Spitzer (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263553#comment-15263553
 ] 

Russell Alexander Spitzer commented on CASSANDRA-11542:
---

You may also want to do tests reading into CaseClasses rather than 
CassandraRows, 

{code}
case class RowName( col:Type, col2: type, )
sc.cassandraTable[RowName]{code}

This may explain some of the difference between RDD and DataFrame read times as 
Dataframes (SqlRows vs CassandraRows) read into a different format than RDDs by 
default and case classes should be much more efficient than the map based 
CassandraRows. In addition I think the parquet versions are able to skip full 
counts (because of the metadata) but i'm not really sure about that which may 
give them the advantage over CSV ... Again not sure it could just be the 
compression of repeated values 

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-28 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263494#comment-15263494
 ] 

Stefania commented on CASSANDRA-11542:
--

These are the results with the Spark Connector modified to support the 
streaming proof of concept. Results are in seconds and represent the average of 
5 different runs, see [^spark-load-perf-results-002.zip] for the raw data.

The improvement is approximately 30% for the RDD tests and 60% for the DF 
tests. Further, the data for schema 3 does not match the data observed in the 
previous run and the high variance continues to be observed.

|| ||SCHEMA 1|| ||SCHEMA 2|| ||SCHEMA 3|| ||SCHEMA 4|| ||
||   Test||Time||Std. Dev||Time||Std. Dev|| 
   Time||Std. Dev||Time||Std. Dev||
| parquet_rdd|2.73|0.23|2.90|0.30|6.09|0.21|6.33|0.21|
|  parquet_df|2.87|0.72|2.68|0.62|4.65|0.78|4.40|0.32|
| csv_rdd|5.31|0.21|5.18|0.24|6.58|0.11|6.50|0.12|
|  csv_df|12.26|1.00|12.31|0.28|13.03|0.25|13.04|0.19|
|   cassandra_rdd|49.72|2.80|46.57|2.77|19.75|0.58|39.83|18.72|
|cassandra_rdd_stream|35.20|3.61|32.45|1.13|15.47|1.32|27.78|8.20|
|cassandra_df|33.32|5.40|35.75|1.90|19.84|8.43|35.82|17.67|
|cassandra_df_stream|20.76|2.91|21.06|0.72|12.80|0.47|22.70|9.00|

I think there may be another dominating factor that explains these results, 
aside from the time it takes to receive data from Cassandra. The fact that the 
streaming improvement is more noticeable for DF rather than RDD tests, and 
significantly less than that noticed for [cassandra-stress 
benchmarks|https://issues.apache.org/jira/browse/CASSANDRA-9259?focusedCommentId=15228054=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15228054],
 may indicate that data decoding client-side plays a bigger role than streaming 
in the final performance results. I am going to attach flight recorder to a 
Spark worker to see if this assumption is correct. I still think we need 
CASSANDRA-11520 and CASSANDRA-11521, but I just want to make sure we tackle the 
bigger "bang for the buck" first.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
> Attachments: spark-load-perf-results-001.zip, 
> spark-load-perf-results-002.zip
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-19 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249213#comment-15249213
 ] 

Stefania commented on CASSANDRA-11542:
--

I've run [the 
benchmark|https://github.com/stef1927/spark-load-perf/tree/master] described 
above on a 5-node GCE {{n1-standard-8}} cluster (30 GB RAM and 8 virtual cores 
per node, HDDs).

The following schemas were tested:

* {{CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, 
val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 
INT, PRIMARY KEY (id, timestamp))}}

* {{CREATE TABLE ks.schema2 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, 
val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 
INT, PRIMARY KEY ((id, timestamp)))}}

* {{CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY 
(id, timestamp))}}

* {{CREATE TABLE ks.schama4 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY 
((id, timestamp)))}}

The first two schemas are identical except that the second schema uses a 
composite partition key whist the first one uses a clustering key. The same is 
true for the third and forth schemas. The difference between the first two 
schemas and the last twos is that the 10 integer values are encoded into a 
string in the last two schemas. This was done to measure the impact of reading 
multiple values from Cassandra, whilst the impact of clustering rows can be 
determined by comparing schemas one and two or three and four.

15 million rows of random data were generated and stored in the following 
sources:

* Cassandra
* A CSV file stored in HDFS
* A Parquet file stored in HDFS

After generating the data, the Cassandra tables were flushed and compacted. The 
OS page cache was also flushed after generating the data, and after every test 
run, via {{sync && echo 3 | sudo tee /proc/sys/vm/drop_caches}}. The HDFS files 
were divided into 1000 partitions due to how the data was generated.

The benchmark either retrieves a Spark RDD (Resilient Distributed Datasets) or 
a DF (Data Frame). The difference between the two is that the RDD contains the 
entire table or file data, whilst the data frame only contains the two columns 
that are used to produce the final result. The following tests were performed 
in random order:

* *Cassandra RDD:* the entire Cassandra table is loaded into an RDD via 
{{sc.cassandraTable}};
* *CSV RDD:* the CSV data is loaded into an RDD via {{sc.textFile}};
* *Parquet RDD:* the Parquet data is loaded into an RDD via 
{{sqlContext.read.parquet}}
* *Cassandra DF:* a SELECT predicate is pushed to the server via 
{{CassandraSQLContext}} to retrieve two columns that are saved into a data 
frame;
* *CSV DF:* the CSV data is loaded into a DF via the spark SQL context using 
{{com.databricks.spark.csv}} as the format, and two columns are saved in a data 
frame; 
* *Parquet DF:* a SELECT predicate is used via {{SQLContext}} to retrieve two 
columns that are saved into a data frame.

RDD or DF are iterated and the result is calculated by selecting the global 
maximum of the maximum of two columns for each row. The time taken to create 
either RDD or DF and to iterate them is then measured.

h3. RDD Results

*Schema1*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.494837|5.478472|43.423967|8|12|
|Run 2|2.845326|5.167405|47.170665|9|17|
|Run 3|2.613721|4.904634|48.451015|10|19|
|Average|2.98|5.18|46.35|9|16|
|Std. Dev.|0.46|0.29|2.61| | |


*Schema2*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.486563|5.635907|46.00437|8|13|
|Run 2|2.68518|5.13979|46.108184|9|17|
|Run 3|2.673291|5.035654|46.076284|9|17|
|Average|2.95|5.27|46.06|9|16|
|Std. Dev.|0.47|0.32|0.05| | |

*Schema3*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|6.122885|6.79348|29.643609|4|5|
|Run 2|5.826286|6.563861|32.900336|5|6|
|Run 3|5.751427|6.41375|33.176358|5|6|
|Average|5.90|6.59|31.91|5|5|
|Std. Dev.|0.20|0.19|1.96| | |

*Schema4*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|6.137645|7.511649|29.518883|4|5|
|Run 2|5.984526|6.569239|30.723268|5|5|
|Run 3|5.763102|6.590789|30.789137|5|5|
|Average|5.96|6.89|30.34|4|5|
|Std. Dev.|0.19|0.54|0.72| | |


h3. DF Results

*Schema1*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|2.843182|15.651141|37.997299|2|13|
|Run 2|2.357436|11.582413|30.836383|3|13|
|Run 3|2.386732|11.75583|30.061433|3|13|
|Average|2.53|13.00|32.97|3|13|
|Std. Dev.|0.27|2.30|4.38| | |


*Schema2*

||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet||
|Run 1|3.016107|12.484605|95.12199|8|32|
|Run 2|2.455694|12.13422|37.583736|3|15|
|Run 3|2.329835|12.007215|34.966389|3|15|
|Average|2.60|12.21|55.89|5|21|
|Std. Dev.|0.37|0.25|34.00| | |


*Schema3*

||15M 

[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-13 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15238919#comment-15238919
 ] 

Stefania commented on CASSANDRA-11542:
--

Thanks for all the suggestions Vincent. 

I've attached a [small benchmark|https://github.com/stef1927/spark-load-perf] 
that is still work in progress but the basic idea is there:

- it generates a schema and populates it with random data
- the same data is saved to HDFS as csv and as Parquet files
- it then measures how long it takes to do a computation on an RDD backed by 
Cassandra, HDFS csv or HDFS Parquet
- the computation picks the maximum amongst two columns for each record, and 
then the maximum amongst records

I still need to add a few minor things (disable filter pushdown, flush the OS 
cache after each test) and to scale up to multiple nodes in order to support 
bigger datasets. I will also add multiple schemas. Finally, I need to modify 
the spark connector to support the streaming POC that was delivered by 
CASSANDRA-9259.



> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-12 Thread vincent.poncet (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236941#comment-15236941
 ] 

vincent.poncet commented on CASSANDRA-11542:


Another point. Usually in HDFS landscape, datanodes have heavy storage, huge 
but fast too.
The common place is 10 to 14 drives of 4TB per node.
On performance side, that's about 1 - 1.5GB/s sequential bandwidth.
Usually with Cassandra, we have one hard drive for data. But if you use a PCIe 
Flash, you are at 1 to 2 GB/s sequential bandwidth.

On storage space side, usually SQL on Hadoop benchmark are made with 1TB per 
node of usable data.
Keep in mind, parquet is columnar and with compression, so huge compression. I 
assume a lot more than a row oriented compression like Cassandra.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-12 Thread vincent.poncet (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236935#comment-15236935
 ] 

vincent.poncet commented on CASSANDRA-11542:


Parquet is a structured format, so you need a schema, so two possibilities:
1) a CSV with a header
So the easy way is to use DataFrame API and Spark-CSV package which has schema 
inference.
https://github.com/databricks/spark-csv
Then
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("people.txt")
df.write.parquet("people.parquet")

2) a CSV without header, you need to apply a schema to create a DataFrame from 
it.
That's easier with RDD, no need for external Spark-CSV package
case class Person(name: String, age: Int)
val people = sc.textFile ("people.txt").map(_.split(",")).map(p => Person(p(0), 
p(1).trim.toInt)).toDF()
df.write.parquet("people.parquet")

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-12 Thread Stefania (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236774#comment-15236774
 ] 

Stefania commented on CASSANDRA-11542:
--

I haven't considered parquet requests so far but, if we wanted to add parquet 
data to this benchmark, is there an easy way to convert a csv or text file to 
parquet?

I had a look at both TPC-DS and spark-sql-perf. I prefer something simpler for 
this benchmark, since we are measuring data loading times, not SQL query 
performance. Specifically, I would prefer to limit queries to a single table at 
a time, so that changing the columns of the table under test allows measuring 
the impact of clustering and value columns when reading from Cassandra.

> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times

2016-04-11 Thread vincent.poncet (JIRA)

[ 
https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235701#comment-15235701
 ] 

vincent.poncet commented on CASSANDRA-11542:


If you plan to do parquet tests, you have to make sure you are not only doing 
count or min/max tests.
Parquet is columnar, so only read relevant fields for the query and has 
statistics on fields (min,max) about group of rows and Spark uses it to skip a 
group of rows if that fits the where predicates.
https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3c55cc562c.6050...@gmail.com%3E

DataBricks delivered a TPC-DS performance test for Spark SQL.
https://github.com/databricks/spark-sql-perf
TPC-DS is supposed to bring real life relevant datawarehouse style queries and 
all SQL-on-Hadoop articles are around TPC-DS.

If you want to do pure bulk reading, make sure to disable predicate pushdown 
using spark.sql.parquet.filterPushdown setting.


> Create a benchmark to compare HDFS and Cassandra bulk read times
> 
>
> Key: CASSANDRA-11542
> URL: https://issues.apache.org/jira/browse/CASSANDRA-11542
> Project: Cassandra
>  Issue Type: Sub-task
>  Components: Testing
>Reporter: Stefania
>Assignee: Stefania
> Fix For: 3.x
>
>
> I propose creating a benchmark for comparing Cassandra and HDFS bulk reading 
> performance. Simple Spark queries will be performed on data stored in HDFS or 
> Cassandra, and the entire duration will be measured. An example query would 
> be the max or min of a column or a count\(*\).
> This benchmark should allow determining the impact of:
> * partition size
> * number of clustering columns
> * number of value columns (cells)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)