[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299675#comment-15299675 ] Stefania commented on CASSANDRA-11542: -- Thank you for the review. I will increase the number of rows from 15M to at least 50M next time I re-run the benchmark. I think we can close this ticket for now. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip, spark-load-perf-results-003.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15299326#comment-15299326 ] Russell Alexander Spitzer commented on CASSANDRA-11542: --- The benchmark looks good to me. I would only suggest you increase the volume of data in the run so that the ratio of pulling data from C* to setting up Spark work is lower. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip, spark-load-perf-results-003.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290765#comment-15290765 ] Stefania commented on CASSANDRA-11542: -- Great, thank you! Let's continue the discussion on the connector changes there. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip, spark-load-perf-results-003.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290743#comment-15290743 ] Artem Aliev commented on CASSANDRA-11542: - [~Stefania], I have created https://datastax-oss.atlassian.net/browse/SPARKC-383 for your connector finding. If you have new code on finding, please comments there. I will discuss the design go the change with the team and improve you proposal. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip, spark-load-perf-results-003.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273884#comment-15273884 ] Stefania commented on CASSANDRA-11542: -- I've fixed the benchmark to ensure all jobs have a similar number of spark tasks. This was accomplished by reducing the number of HDFS blocks and decreasing the Cassandra split size in the connector. All jobs now have approximately 35 Spark tasks, of which 10 execute in parallel (5 nodes with 2 executors each). Here are the results for schema 1 and 3: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.74|0.23|9.04|0.28| |parquet_df|2.02|0.82|4.86|0.50| |csv_rdd|11.36|1.99|10.14|0.64| |csv_df|13.17|0.45|15.93|1.61| |cassandra_rdd|*40.81*|0.80|*23.58*|0.53| |cassandra_rdd_stream|33.24|0.53|19.34|0.29| |cassandra_df|*26.07*|0.73|*16.75*|0.88| |cassandra_df_stream|19.39|2.18|13.19|1.71| As expected, the Cassandra numbers have improved significantly due to the increased parallelism. I've also performed some client side optimizations that followed by analyzing the JFR files recorded during the last run, the results are as follows: ||15M rows||SCHEMA 1|| ||SCHEMA 3|| || ||Test||Time||Std. Dev||Time||Std. Dev|| |parquet_rdd|4.58|0.23|8.85|0.63| |parquet_df|2.69|2.23|4.94|0.27| |csv_rdd|10.70|0.43|11.04|1.00| |csv_df|14.02|1.01|14.75|0.43| |cassandra_rdd|26.60|2.50|16.14|0.28| |cassandra_rdd_stream|*15.91*|0.33|*13.06*|0.72| |cassandra_df|21.20|0.86|15.15|1.27| |cassandra_df_stream|*13.04*|0.87|*11.18*|0.54| When both streaming and client optimizations are in place, the performance is considerably better. The overall percentage improvements by test and schema are as follows: ||Percentage reduction in time after optimization||SCHEMA 1|| ||SCHEMA 3|| || || ||RDD||DF||RDD||DF|| |Total|61.01%|49.98%|44.62%|33.25%| |Client optimizations|52.13%|32.75%|32.48%|15.27%| |Streaming (after client optimizations)|40.17%|38.51%|19.10%|26.22%| Raw result data is attached as [^spark-load-perf-results-003.zip]. The client improvements are available [here|https://github.com/stef1927/spark-cassandra-connector/commit/1490178b9c166dc9e6c38f63be5eb7232e73ddd8], this is a quick summary: * The cache of type decoders in the driver is extremely slow, saving the decoders in an array for each RDD computation is by far the most significant factor * {{GettableData}} is creating a map, [{{_indexOf}}|https://github.com/stef1927/spark-cassandra-connector/blob/9259/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala#L16], for each single row. However, this map does not change across rows. In the optimizations I've replaced {{CassandraRow}} with {{GettableByIndexData}} for case classes but this needs further work. * For the non streaming case, the query may not necessarily end up to a replica (token aware routing only works with partition keys, not with tokens and only the first set of tasks end up to the preferred Spark location, following tasks may not). * The Spark metrics have been disabled as they contributed somewhat to the total decoding time, we need to calculate the binary row size without iterating the row column BBs again. * Scala for loops are implemented as {{range.foreach}} and the closure passed to the foreach is not inlined as far as I understand so I think we need to be careful with for loops in the critical path. They definitely show up quite high in JFR, but I cannot say that replacing a single for loop does cause an impact in the total final time. Further client size optimization is possible, I would suggest integrating the connector with the driver at a very low level in order to convert {{ByteBuffers}} directly into Scala types. In addition, the improvements I've already implemented will need further refinment. However, in the JFR recordings taken after these optimizations, the Spark executors now spend 66% of the time waiting and 33% of the time processing rows, whilst the NIO workers spend 95% of their time waiting. Therefore I suggest moving on to server side optimizations, starting with CASSANDRA-11521. This [benchmark|https://github.com/stef1927/spark-load-perf] is ready for review as it will be used to measure any future improvement. [~rspitzer] would you be able to quickly review it or find someone interested? > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip,
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15266239#comment-15266239 ] Stefania commented on CASSANDRA-11542: -- bq. Saw that we are always doing the conversion to CassandraRow with RDDs, dataframes go directly to the internal SQL Type. In the dataframe tests we also only retrieve the two columns for the calculation rather than all columns . I described this above, sorry if it wasn't clear. bq.The code you presented looks good to me, there is the potential issue of blocking on resultsets that take a long time to complete while other result-sets are already on the driver but i'm not sure if this is a big deal. Do you have any idea of the parallelization in these test? How many partitions are the different runs generating? Thanks for checking the code. The result set futures should not block because the driver completes them as soon as they are transferred to the iterator's thread. I'm actually using futures as a lazy way to also transfer error conditions rather than just results. In terms of parallelism, each C* node receives 256 token range queries per RDD iteration. This should be fine since each node has 256 tokens. I've also checked the spark tasks by connecting to the web UI at port 4040 and initially I could see 10 tasks per Cassandra RDD operation, then they increased to 20 when I increased the number of executor cores to 4. I have 5 nodes with 2 executors each so the initial number 10 makes sense as by default there is one core per executor, however I don't understand why I ended up with 20 rather than 40 when I increased the number of cores to 4. {{spark-env.sh}} is [here|https://github.com/stef1927/spark-load-perf/blob/master/bin/install_spark.sh#L34] if you want to check it out but there's not much to it other than the number of executor cores. I also note that the CSV and Parquet RDD operations have as many tasks as there are HDFS partitions, so 1000 tasks. This would give them a big advantage if we have cores idle but I don't know how to reliably increase tasks for C* RDDs. I've collected JFR files for both Cassandra and the Spark executors: [^jfr_recordings.zip]. I still need to analyze them but from a quick look there are at least two interesting things client side (plus maybe a third one): we seem to spend a lot of time in {{CassandraRow._indexOfOrThrow()}} and in selecting the codecs in the driver. As for the C* JFR recorder, we spend 80% in the new bulk read code but we also still spend 15% of time in {{ReadCommandVerbHandler}}, which I don't understand. I will post another update when I have more details on the JFR analysis and any optimizations that might follow. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: jfr_recordings.zip, spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264867#comment-15264867 ] vincent.poncet commented on CASSANDRA-11542: Cassandra tests have a way higher normalized std deviation (std dev / mean) than hdfs tests. and looking in you previous tests, first run of cassandra is way way higher than next runs. So, it really looks like there is a cache somewhere which is biaising your cassandra numbers. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264698#comment-15264698 ] Russell Alexander Spitzer commented on CASSANDRA-11542: --- Hmm i'm a little confused that case classes don't help but Dataframes do... The code you presented looks good to me, there is the potential issue of blocking on resultsets that take a long time to complete while other result-sets are already on the driver but i'm not sure if this is a big deal. Do you have any idea of the parallelization in these test? How many partitions are the different runs generating? > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263830#comment-15263830 ] Stefania commented on CASSANDRA-11542: -- Thank you for the suggestions [~rspitzer]. Switching to case classes for the RDD tests has actually made things much worse for streaming. For example for schema 1: ||Test||Time||Std. Dev|| | parquet_rdd|2.67|0.50| | parquet_df|2.69|0.66| | csv_rdd|5.20|0.58| | csv_df| 12.37|0.53| | cassandra_rdd| 49.03|5.46| |cassandra_rdd_stream| 47.49|3.84| |cassandra_df| 27.78|0.74| | cassandra_df_stream| 19.35|1.51| I've also fixed a bug with the benchmark: previously sstables were only compacted and flushed on the spark master, not on all nodes. Further, the OS page cache might also not necessarily have been flushed on all nodes. Here is a repeat of data for schema 1 with this problem addressed: ||Test||Time||Std. Dev|| | parquet_rdd|4.81|0.31| | parquet_df|4.77|0.78| | csv_rdd|7.59|0.22| | csv_df| 13.09|0.41| | cassandra_rdd| 45.13|0.55| |cassandra_rdd_stream| 41.64|0.37| |cassandra_df| 36.15| 11.70| | cassandra_df_stream| 22.55|3.36| In terms of IO usage, dstat shows less than 10MB per sec for Cassandra compared to 110 MB for HDFS. Regarding the degradation of RDD streaming with case classes, I suspect the way I implemented the RDD iterator might be inefficient, could you take a look [here|https://github.com/datastax/spark-cassandra-connector/compare/master...stef1927:9259#diff-fecec01aca0cc6ed91526423b292eceaR12]? We receive an iterator of futures from the driver, one per page. As soon as the future completes we return the rows, which then get converted to case classes. I think that somehow we should convert page rows in parallel to speed things up and take advantage of streaming. [~slebresne] : noted, thank you! I should be able to start CASSANDRA-11521 next week, and continue this investigation in parallel. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263751#comment-15263751 ] Sylvain Lebresne commented on CASSANDRA-11542: -- bq. I still think we need CASSANDRA-11520 and CASSANDRA-11521, but I just want to make sure we tackle the bigger "bang for the buck" first. Just to say that I generally agree with that statement, but adding that 60% improvement (or even 30% for RDD) is not too shabby either and both CASSANDRA-11520 and CASSANDRA-11521 seems to be doable without too much effort/disruption. That is, as far as I'm concerned, you've gathered enough evidence to show that they are worth doing and I'd be happy to start there. This not excluding to continue digging in parallel of course. Anyway, great work so far, thanks. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263561#comment-15263561 ] Russell Alexander Spitzer commented on CASSANDRA-11542: --- One other thing, when we were testing before we noticed that our Reads were not IO bound, with Streaming are we now IO bound on the C* side? IE is drive usage at full? > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263553#comment-15263553 ] Russell Alexander Spitzer commented on CASSANDRA-11542: --- You may also want to do tests reading into CaseClasses rather than CassandraRows, {code} case class RowName( col:Type, col2: type, ) sc.cassandraTable[RowName]{code} This may explain some of the difference between RDD and DataFrame read times as Dataframes (SqlRows vs CassandraRows) read into a different format than RDDs by default and case classes should be much more efficient than the map based CassandraRows. In addition I think the parquet versions are able to skip full counts (because of the metadata) but i'm not really sure about that which may give them the advantage over CSV ... Again not sure it could just be the compression of repeated values > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15263494#comment-15263494 ] Stefania commented on CASSANDRA-11542: -- These are the results with the Spark Connector modified to support the streaming proof of concept. Results are in seconds and represent the average of 5 different runs, see [^spark-load-perf-results-002.zip] for the raw data. The improvement is approximately 30% for the RDD tests and 60% for the DF tests. Further, the data for schema 3 does not match the data observed in the previous run and the high variance continues to be observed. || ||SCHEMA 1|| ||SCHEMA 2|| ||SCHEMA 3|| ||SCHEMA 4|| || || Test||Time||Std. Dev||Time||Std. Dev|| Time||Std. Dev||Time||Std. Dev|| | parquet_rdd|2.73|0.23|2.90|0.30|6.09|0.21|6.33|0.21| | parquet_df|2.87|0.72|2.68|0.62|4.65|0.78|4.40|0.32| | csv_rdd|5.31|0.21|5.18|0.24|6.58|0.11|6.50|0.12| | csv_df|12.26|1.00|12.31|0.28|13.03|0.25|13.04|0.19| | cassandra_rdd|49.72|2.80|46.57|2.77|19.75|0.58|39.83|18.72| |cassandra_rdd_stream|35.20|3.61|32.45|1.13|15.47|1.32|27.78|8.20| |cassandra_df|33.32|5.40|35.75|1.90|19.84|8.43|35.82|17.67| |cassandra_df_stream|20.76|2.91|21.06|0.72|12.80|0.47|22.70|9.00| I think there may be another dominating factor that explains these results, aside from the time it takes to receive data from Cassandra. The fact that the streaming improvement is more noticeable for DF rather than RDD tests, and significantly less than that noticed for [cassandra-stress benchmarks|https://issues.apache.org/jira/browse/CASSANDRA-9259?focusedCommentId=15228054=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15228054], may indicate that data decoding client-side plays a bigger role than streaming in the final performance results. I am going to attach flight recorder to a Spark worker to see if this assumption is correct. I still think we need CASSANDRA-11520 and CASSANDRA-11521, but I just want to make sure we tackle the bigger "bang for the buck" first. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > Attachments: spark-load-perf-results-001.zip, > spark-load-perf-results-002.zip > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15249213#comment-15249213 ] Stefania commented on CASSANDRA-11542: -- I've run [the benchmark|https://github.com/stef1927/spark-load-perf/tree/master] described above on a 5-node GCE {{n1-standard-8}} cluster (30 GB RAM and 8 virtual cores per node, HDDs). The following schemas were tested: * {{CREATE TABLE ks.schema1 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, PRIMARY KEY (id, timestamp))}} * {{CREATE TABLE ks.schema2 (id TEXT, timestamp BIGINT, val1 INT, val2 INT, val3 INT, val4 INT, val5 INT, val6 INT, val7 INT, val8 INT, val9 INT, val10 INT, PRIMARY KEY ((id, timestamp)))}} * {{CREATE TABLE ks.schema3 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY (id, timestamp))}} * {{CREATE TABLE ks.schama4 (id TEXT, timestamp BIGINT, data TEXT, PRIMARY KEY ((id, timestamp)))}} The first two schemas are identical except that the second schema uses a composite partition key whist the first one uses a clustering key. The same is true for the third and forth schemas. The difference between the first two schemas and the last twos is that the 10 integer values are encoded into a string in the last two schemas. This was done to measure the impact of reading multiple values from Cassandra, whilst the impact of clustering rows can be determined by comparing schemas one and two or three and four. 15 million rows of random data were generated and stored in the following sources: * Cassandra * A CSV file stored in HDFS * A Parquet file stored in HDFS After generating the data, the Cassandra tables were flushed and compacted. The OS page cache was also flushed after generating the data, and after every test run, via {{sync && echo 3 | sudo tee /proc/sys/vm/drop_caches}}. The HDFS files were divided into 1000 partitions due to how the data was generated. The benchmark either retrieves a Spark RDD (Resilient Distributed Datasets) or a DF (Data Frame). The difference between the two is that the RDD contains the entire table or file data, whilst the data frame only contains the two columns that are used to produce the final result. The following tests were performed in random order: * *Cassandra RDD:* the entire Cassandra table is loaded into an RDD via {{sc.cassandraTable}}; * *CSV RDD:* the CSV data is loaded into an RDD via {{sc.textFile}}; * *Parquet RDD:* the Parquet data is loaded into an RDD via {{sqlContext.read.parquet}} * *Cassandra DF:* a SELECT predicate is pushed to the server via {{CassandraSQLContext}} to retrieve two columns that are saved into a data frame; * *CSV DF:* the CSV data is loaded into a DF via the spark SQL context using {{com.databricks.spark.csv}} as the format, and two columns are saved in a data frame; * *Parquet DF:* a SELECT predicate is used via {{SQLContext}} to retrieve two columns that are saved into a data frame. RDD or DF are iterated and the result is calculated by selecting the global maximum of the maximum of two columns for each row. The time taken to create either RDD or DF and to iterate them is then measured. h3. RDD Results *Schema1* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.494837|5.478472|43.423967|8|12| |Run 2|2.845326|5.167405|47.170665|9|17| |Run 3|2.613721|4.904634|48.451015|10|19| |Average|2.98|5.18|46.35|9|16| |Std. Dev.|0.46|0.29|2.61| | | *Schema2* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.486563|5.635907|46.00437|8|13| |Run 2|2.68518|5.13979|46.108184|9|17| |Run 3|2.673291|5.035654|46.076284|9|17| |Average|2.95|5.27|46.06|9|16| |Std. Dev.|0.47|0.32|0.05| | | *Schema3* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|6.122885|6.79348|29.643609|4|5| |Run 2|5.826286|6.563861|32.900336|5|6| |Run 3|5.751427|6.41375|33.176358|5|6| |Average|5.90|6.59|31.91|5|5| |Std. Dev.|0.20|0.19|1.96| | | *Schema4* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|6.137645|7.511649|29.518883|4|5| |Run 2|5.984526|6.569239|30.723268|5|5| |Run 3|5.763102|6.590789|30.789137|5|5| |Average|5.96|6.89|30.34|4|5| |Std. Dev.|0.19|0.54|0.72| | | h3. DF Results *Schema1* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|2.843182|15.651141|37.997299|2|13| |Run 2|2.357436|11.582413|30.836383|3|13| |Run 3|2.386732|11.75583|30.061433|3|13| |Average|2.53|13.00|32.97|3|13| |Std. Dev.|0.27|2.30|4.38| | | *Schema2* ||15M records||Parquet||CSV||Cassandra||Cassandra / CSV||Cassandra / Parquet|| |Run 1|3.016107|12.484605|95.12199|8|32| |Run 2|2.455694|12.13422|37.583736|3|15| |Run 3|2.329835|12.007215|34.966389|3|15| |Average|2.60|12.21|55.89|5|21| |Std. Dev.|0.37|0.25|34.00| | | *Schema3* ||15M
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15238919#comment-15238919 ] Stefania commented on CASSANDRA-11542: -- Thanks for all the suggestions Vincent. I've attached a [small benchmark|https://github.com/stef1927/spark-load-perf] that is still work in progress but the basic idea is there: - it generates a schema and populates it with random data - the same data is saved to HDFS as csv and as Parquet files - it then measures how long it takes to do a computation on an RDD backed by Cassandra, HDFS csv or HDFS Parquet - the computation picks the maximum amongst two columns for each record, and then the maximum amongst records I still need to add a few minor things (disable filter pushdown, flush the OS cache after each test) and to scale up to multiple nodes in order to support bigger datasets. I will also add multiple schemas. Finally, I need to modify the spark connector to support the streaming POC that was delivered by CASSANDRA-9259. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236941#comment-15236941 ] vincent.poncet commented on CASSANDRA-11542: Another point. Usually in HDFS landscape, datanodes have heavy storage, huge but fast too. The common place is 10 to 14 drives of 4TB per node. On performance side, that's about 1 - 1.5GB/s sequential bandwidth. Usually with Cassandra, we have one hard drive for data. But if you use a PCIe Flash, you are at 1 to 2 GB/s sequential bandwidth. On storage space side, usually SQL on Hadoop benchmark are made with 1TB per node of usable data. Keep in mind, parquet is columnar and with compression, so huge compression. I assume a lot more than a row oriented compression like Cassandra. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236935#comment-15236935 ] vincent.poncet commented on CASSANDRA-11542: Parquet is a structured format, so you need a schema, so two possibilities: 1) a CSV with a header So the easy way is to use DataFrame API and Spark-CSV package which has schema inference. https://github.com/databricks/spark-csv Then val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("people.txt") df.write.parquet("people.parquet") 2) a CSV without header, you need to apply a schema to create a DataFrame from it. That's easier with RDD, no need for external Spark-CSV package case class Person(name: String, age: Int) val people = sc.textFile ("people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() df.write.parquet("people.parquet") > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236774#comment-15236774 ] Stefania commented on CASSANDRA-11542: -- I haven't considered parquet requests so far but, if we wanted to add parquet data to this benchmark, is there an easy way to convert a csv or text file to parquet? I had a look at both TPC-DS and spark-sql-perf. I prefer something simpler for this benchmark, since we are measuring data loading times, not SQL query performance. Specifically, I would prefer to limit queries to a single table at a time, so that changing the columns of the table under test allows measuring the impact of clustering and value columns when reading from Cassandra. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CASSANDRA-11542) Create a benchmark to compare HDFS and Cassandra bulk read times
[ https://issues.apache.org/jira/browse/CASSANDRA-11542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235701#comment-15235701 ] vincent.poncet commented on CASSANDRA-11542: If you plan to do parquet tests, you have to make sure you are not only doing count or min/max tests. Parquet is columnar, so only read relevant fields for the query and has statistics on fields (min,max) about group of rows and Spark uses it to skip a group of rows if that fits the where predicates. https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3c55cc562c.6050...@gmail.com%3E DataBricks delivered a TPC-DS performance test for Spark SQL. https://github.com/databricks/spark-sql-perf TPC-DS is supposed to bring real life relevant datawarehouse style queries and all SQL-on-Hadoop articles are around TPC-DS. If you want to do pure bulk reading, make sure to disable predicate pushdown using spark.sql.parquet.filterPushdown setting. > Create a benchmark to compare HDFS and Cassandra bulk read times > > > Key: CASSANDRA-11542 > URL: https://issues.apache.org/jira/browse/CASSANDRA-11542 > Project: Cassandra > Issue Type: Sub-task > Components: Testing >Reporter: Stefania >Assignee: Stefania > Fix For: 3.x > > > I propose creating a benchmark for comparing Cassandra and HDFS bulk reading > performance. Simple Spark queries will be performed on data stored in HDFS or > Cassandra, and the entire duration will be measured. An example query would > be the max or min of a column or a count\(*\). > This benchmark should allow determining the impact of: > * partition size > * number of clustering columns > * number of value columns (cells) -- This message was sent by Atlassian JIRA (v6.3.4#6332)