[SQL] Why does a small two-source JDBC query take ~150-200ms with all optimizations (AQE, CBO, pushdown, Kryo, unsafe) enabled? (v3.4.0-SNAPSHOT)
I did some basic testing of multi-source queries with the most recent Spark: https://github.com/GavinRay97/spark-playground/blob/44a756acaee676a9b0c128466e4ab231a7df8d46/src/main/scala/Application.scala#L46-L115 The output of "spark.time()" surprised me: SELECT p.id, p.name, t.id, t.title FROM db1.public.person p JOIN db2.public.todos t ON p.id = t.person_id WHERE p.id = 1 +---++---+--+ | id|name| id| title| +---++---+--+ | 1| Bob| 1|Todo 1| | 1| Bob| 2|Todo 2| +---++---+--+ Time taken: 168 ms SELECT p.id, p.name, t.id, t.title FROM db1.public.person p JOIN db2.public.todos t ON p.id = t.person_id WHERE p.id = 2 LIMIT 1 +---+-+---+--+ | id| name| id| title| +---+-+---+--+ | 2|Alice| 3|Todo 3| +---+-+---+--+ Time taken: 228 ms Calcite and Teiid manage to do this on the order of 5-50ms for basic queries, so I'm curious about the technical specifics on why Spark appears to be so much slower here?
Spark sql join optimizations
Hello, I recently noticed that spark doesn't optimize the joins when we are limiting it. Say when we have payment.join(customer,Seq("customerId"), "left").limit(1).explain(true) Spark doesn't optimize it. > == Physical Plan == > CollectLimit 1 > +- *(5) Project [customerId#29, paymentId#28, amount#30, name#41] >+- SortMergeJoin [customerId#29], [customerId#40], LeftOuter > :- *(2) Sort [customerId#29 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(customerId#29, 200) > : +- *(1) Project [_1#24 AS paymentId#28, _2#25 AS > customerId#29, _3#26 AS amount#30] > :+- *(1) SerializeFromObject [assertnotnull(input[0, > scala.Tuple3, true])._1 AS _1#24, assertnotnull(input[0, scala.Tuple3, > true])._2 AS _2#25, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#26] > : +- Scan[obj#23] > +- *(4) Sort [customerId#40 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(customerId#40, 200) > +- *(3) Project [_1#37 AS customerId#40, _2#38 AS name#41] >+- *(3) SerializeFromObject [assertnotnull(input[0, > scala.Tuple2, true])._1 AS _1#37, staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, > assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#38] > +- Scan[obj#36] Am I missing something here? Is there a way to avoid unnecessary joining of data? Regards, Akhil
Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?
I kind of think "Thats the whole point" :) Sorry it is Friday here :) :) On Fri, Jul 7, 2017 at 1:09 PM, Michael Armbrust <mich...@databricks.com> wrote: > It goes through the same optimization pipeline. More in this video > <https://youtu.be/1a4pgYzeFwE?t=608>. > > On Thu, Jul 6, 2017 at 5:28 PM, kant kodali <kanth...@gmail.com> wrote: > >> HI All, >> >> I am wondering If I pass a raw SQL string to dataframe do I still get the >> Spark SQL optimizations? why or why not? >> >> Thanks! >> > > -- Best Regards, Ayan Guha
Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?
It goes through the same optimization pipeline. More in this video <https://youtu.be/1a4pgYzeFwE?t=608>. On Thu, Jul 6, 2017 at 5:28 PM, kant kodali <kanth...@gmail.com> wrote: > HI All, > > I am wondering If I pass a raw SQL string to dataframe do I still get the > Spark SQL optimizations? why or why not? > > Thanks! >
If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?
HI All, I am wondering If I pass a raw SQL string to dataframe do I still get the Spark SQL optimizations? why or why not? Thanks!
Re: Is there a list of missing optimizations for typed functions?
Hi, you can refer to https://issues.apache.org/jira/browse/SPARK-14083 for more detail. For performance issue,it is better to using the DataFrame than DataSet API. On Sat, Feb 25, 2017 at 2:45 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi Justin, > > I have never seen such a list. I think the area is in heavy development > esp. optimizations for typed operations. > > There's a JIRA to somehow find out more on the behavior of Scala code > (non-Column-based one from your list) but I've seen no activity in this > area. That's why for now Column-based untyped queries could be faster due > to more optimizations applied. Same about UDFs. > > Jacek > > On 23 Feb 2017 7:52 a.m., "Justin Pihony" <justin.pih...@gmail.com> wrote: > >> I was curious if there was introspection of certain typed functions and >> ran >> the following two queries: >> >> ds.where($"col" > 1).explain >> ds.filter(_.col > 1).explain >> >> And found that the typed function does NOT result in a PushedFilter. I >> imagine this is due to a limited view of the function, so I have two >> questions really: >> >> 1.) Is there a list of the methods that lose some of the optimizations >> that >> you get from non-functional methods? Is it any method that accepts a >> generic >> function? >> 2.) Is there any work to attempt reflection and gain some of these >> optimizations back? I couldn't find anything in JIRA. >> >> Thanks, >> Justin Pihony >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizatio >> ns-for-typed-functions-tp28418.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: Disable Spark SQL Optimizations for unit tests
I found some ways to get faster unit tests.In the meantime they had gone up to about an hour. Apparently defining columns in a for loop makes catalyst very slow, as it blows up the logical plan with many projections: final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = { var df = dfIn for (toBeCasted <- castToInts) { df = df.withColumn(toBeCasted, df(toBeCasted).cast(IntegerType)) } df } This is much faster: final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = { val columns = dfIn.columns.map { c => if (castToInts.contains(c)) { dfIn(c).cast(IntegerType) } else { dfIn(c) } } dfIn.select(columns: _*) } As I consequently applied this to other similar functions the unit tests went down from 60 to 18 minutes. Another way to break SQL optimizations was to just save an intermediate dataframe to HDFS and read from there again. This is quite counter intuitive, but the unit tests then further went down from 18 minutes to 5. Is there any other way to add a barrier for catalyst optimizations? As in A -> B -> C, only optimize A -> B, and B -> C but not the complete A -> C? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380p28426.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Is there a list of missing optimizations for typed functions?
Hi Justin, I have never seen such a list. I think the area is in heavy development esp. optimizations for typed operations. There's a JIRA to somehow find out more on the behavior of Scala code (non-Column-based one from your list) but I've seen no activity in this area. That's why for now Column-based untyped queries could be faster due to more optimizations applied. Same about UDFs. Jacek On 23 Feb 2017 7:52 a.m., "Justin Pihony" <justin.pih...@gmail.com> wrote: > I was curious if there was introspection of certain typed functions and ran > the following two queries: > > ds.where($"col" > 1).explain > ds.filter(_.col > 1).explain > > And found that the typed function does NOT result in a PushedFilter. I > imagine this is due to a limited view of the function, so I have two > questions really: > > 1.) Is there a list of the methods that lose some of the optimizations that > you get from non-functional methods? Is it any method that accepts a > generic > function? > 2.) Is there any work to attempt reflection and gain some of these > optimizations back? I couldn't find anything in JIRA. > > Thanks, > Justin Pihony > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed- > functions-tp28418.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Is there a list of missing optimizations for typed functions?
I was curious if there was introspection of certain typed functions and ran the following two queries: ds.where($"col" > 1).explain ds.filter(_.col > 1).explain And found that the typed function does NOT result in a PushedFilter. I imagine this is due to a limited view of the function, so I have two questions really: 1.) Is there a list of the methods that lose some of the optimizations that you get from non-functional methods? Is it any method that accepts a generic function? 2.) Is there any work to attempt reflection and gain some of these optimizations back? I couldn't find anything in JIRA. Thanks, Justin Pihony -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-functions-tp28418.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Disable Spark SQL Optimizations for unit tests
Hi, Can the Spark SQL Optimizations be disabled somehow? In our project we started 4 weeks ago to write scala / spark / dataframe code. We currently have only around 10% of the planned project scope, and we are already waiting 10 (Spark 2.1.0, everything cached) to 30 (Spark 1.6, nothing cached) minutes for a single unit test run to finish. We have for example one scala file with maybe 80 lines of code (several joins, several subtrees reused in different places) that takes up to 6 minutes to be optimized (the catalyst output is also > 100 Mb). The input for our unit tests is usually 2 - 3 rows. That is the motivation to disable the optimizer in unit tests. I have found this unanswered SO post <http://stackoverflow.com/questions/33984152/how-to-speed-up-spark-sql-unit-tests> , but not much more on that topic. I have also found this SimpleTestOptimizer <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L151> which sounds perfect, but I have no idea how to instantiate a Spark Session so it uses that one. Does nobody else have this problem? Is there something fundamentally wrong with our approach? Regards, Stefan Ackermann -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Are ser/de optimizations relevant with Dataset API and Encoders ?
With RDD API, you could optimize shuffling data by making sure that bytes are shuffled instead of objects and using the appropriate ser/de mechanism before and after the shuffle, for example: Before parallelize, transform to bytes using a dedicated serializer, parallelize, and immediately after desirialize (happens on the nodes). The same optimization could be applied in combinePerKey, and when collecting the data to the driver. My question: is this relevant with the Dataset API ? Datasets have a dedicated Encoder and I guess that the binary encoder is less informative then say Integer/String or general Kryo encoder for Objects, and as a result will "lose" some optimization abilities. Is this correct ? Thanks, Amit
Spark 1.5.2 - are new Project Tungsten optimizations available on RDD as well?
Hi, I read about release notes and few slideshares on latest optimizations done on spark 1.4 and 1.5 releases. Part of which are optimizations from project Tungsten. Docs says it usues sun.misc.unsafe to convert physical rdd structure into byte array before shuffle for optimized GC and memory. My question is why is it only applicable to SQL/Dataframe and not RDD? RDD has types too! Thanks, Nirav -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>
Re: Optimizations
This is the basic design of spark that it runs all actions in different stages... Not sure you can achieve what you r looking for. On Jul 3, 2015 12:43 PM, Marius Danciu marius.dan...@gmail.com wrote: Hi all, If I have something like: rdd.join(...).mapPartitionToPair(...) It looks like mapPartitionToPair runs in a different stage then join. Is there a way to piggyback this computation inside the join stage ? ... such that each result partition after join is passed to the mapPartitionToPair function, all running in the same state without any other costs. Best, Marius
Re: Optimizations
Thanks for your feedback. Yes I am aware of stages design and Silvio what you are describing is essentially map-side join which is not applicable when you have both RDDs quite large. It appears that rdd.join(...).mapToPair(f) f is piggybacked inside join stage (right in the reducers I believe) whereas rdd.join(...).mapPartitionToPair( f ) f is executed in a different stage. This is surprising because at least intuitively the difference between mapToPair and mapPartitionToPair is that that former is about the push model whereas the latter is about polling records out of the iterator (*I suspect there are other technical reasons*). If anyone know the depths of the problem if would be of great help. Best, Marius On Fri, Jul 3, 2015 at 6:43 PM Silvio Fiorito silvio.fior...@granturing.com wrote: One thing you could do is a broadcast join. You take your smaller RDD, save it as a broadcast variable. Then run a map operation to perform the join and whatever else you need to do. This will remove a shuffle stage but you will still have to collect the joined RDD and broadcast it. All depends on the size of your data if it’s worth it or not. From: Marius Danciu Date: Friday, July 3, 2015 at 3:13 AM To: user Subject: Optimizations Hi all, If I have something like: rdd.join(...).mapPartitionToPair(...) It looks like mapPartitionToPair runs in a different stage then join. Is there a way to piggyback this computation inside the join stage ? ... such that each result partition after join is passed to the mapPartitionToPair function, all running in the same state without any other costs. Best, Marius
Optimizations
Hi all, If I have something like: rdd.join(...).mapPartitionToPair(...) It looks like mapPartitionToPair runs in a different stage then join. Is there a way to piggyback this computation inside the join stage ? ... such that each result partition after join is passed to the mapPartitionToPair function, all running in the same state without any other costs. Best, Marius
Delayed hotspot optimizations in Spark
Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not.
Re: Delayed hotspot optimizations in Spark
You could try setting -Xcomp for executors to force JIT compilation upfront. I don't know if it's a good idea overall but might show whether the upfront compilation really helps. I doubt it. However is this almost surely due to caching somewhere, in Spark SQL or HDFS? I really doubt hotspot makes a difference compared to these much larger factors. On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Delayed hotspot optimizations in Spark
Hey Sean and spark users! Thanks for reply. I try -Xcomp right now and start time was about few minutes (as expected), but I got first query slow as before: Oct 10, 2014 3:03:41 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 30 columns in 12897 ms: 121.64837 rec/ms, 3649.451 cell/ms and next Oct 10, 2014 3:05:03 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1757 ms: 892.94196 rec/ms, 892.94196 cell/ms I have no idea about caching or other stuff because CPU load is 100% on worker and jstack show that worker is reading from parquet file. Any ideas? Thanks! On Fri, Oct 10, 2014 at 2:55 PM, Sean Owen so...@cloudera.com wrote: You could try setting -Xcomp for executors to force JIT compilation upfront. I don't know if it's a good idea overall but might show whether the upfront compilation really helps. I doubt it. However is this almost surely due to caching somewhere, in Spark SQL or HDFS? I really doubt hotspot makes a difference compared to these much larger factors. On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not.
Re: Delayed hotspot optimizations in Spark
Hi Could it be due to GC ? I read it may happen if your program starts with a small heap. What are your -Xms and -Xmx values ? Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps Guillaume Hello spark users and developers! I am using hdfs + spark sql + hive schema + parquet as storage format. I have lot of parquet files - one files fits one hdfs block for one day. The strange thing is very slow first query for spark sql. To reproduce situation I use only one core and I have 97sec for first time and only 13sec for all next queries. Sure I query for different data, but it has same structure and size. The situation can be reproduced after restart thrift server. Here it information about parquet files reading from worker node: Slow one: Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 4005.4363 cell/ms Fast one: Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader: Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 cell/ms As you can see second reading is 10x times faster then first. Most of the query time spent to work with parquet file. This problem is really annoying, because most of my spark task contains just 1 sql query and data processing and to speedup my jobs I put special warmup query in from of any job. My assumption is that it is hotspot optimizations that used due first reading. Do you have any idea how to confirm/solve this performance problem? Thanks for advice! p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation but can not figure out what are important and what are not. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org