[SQL] Why does a small two-source JDBC query take ~150-200ms with all optimizations (AQE, CBO, pushdown, Kryo, unsafe) enabled? (v3.4.0-SNAPSHOT)

2022-05-18 Thread Gavin Ray
I did some basic testing of multi-source queries with the most recent Spark:
https://github.com/GavinRay97/spark-playground/blob/44a756acaee676a9b0c128466e4ab231a7df8d46/src/main/scala/Application.scala#L46-L115

The output of "spark.time()" surprised me:

SELECT p.id, p.name, t.id, t.title
FROM db1.public.person p
JOIN db2.public.todos t
ON p.id = t.person_id
WHERE p.id = 1

+---++---+--+
| id|name| id| title|
+---++---+--+
|  1| Bob|  1|Todo 1|
|  1| Bob|  2|Todo 2|
+---++---+--+
Time taken: 168 ms

SELECT p.id, p.name, t.id, t.title
FROM db1.public.person p
JOIN db2.public.todos t
ON p.id = t.person_id
WHERE p.id = 2
LIMIT 1

+---+-+---+--+
| id| name| id| title|
+---+-+---+--+
|  2|Alice|  3|Todo 3|
+---+-+---+--+
Time taken: 228 ms


Calcite and Teiid manage to do this on the order of 5-50ms for basic
queries,
so I'm curious about the technical specifics on why Spark appears to be so
much slower here?


Spark sql join optimizations

2019-02-26 Thread Akhilanand
Hello,

I recently noticed that spark doesn't optimize the joins when we are
limiting it.

Say when we have

payment.join(customer,Seq("customerId"), "left").limit(1).explain(true)


Spark doesn't optimize it.

>  == Physical Plan ==
> CollectLimit 1
> +- *(5) Project [customerId#29, paymentId#28, amount#30, name#41]
>+- SortMergeJoin [customerId#29], [customerId#40], LeftOuter
>   :- *(2) Sort [customerId#29 ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(customerId#29, 200)
>   : +- *(1) Project [_1#24 AS paymentId#28, _2#25 AS
> customerId#29, _3#26 AS amount#30]
>   :+- *(1) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple3, true])._1 AS _1#24, assertnotnull(input[0, scala.Tuple3,
> true])._2 AS _2#25, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#26]
>   :   +- Scan[obj#23]
>   +- *(4) Sort [customerId#40 ASC NULLS FIRST], false, 0
>  +- Exchange hashpartitioning(customerId#40, 200)
> +- *(3) Project [_1#37 AS customerId#40, _2#38 AS name#41]
>+- *(3) SerializeFromObject [assertnotnull(input[0,
> scala.Tuple2, true])._1 AS _1#37, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#38]
>   +- Scan[obj#36]


Am I missing something here? Is there a way to avoid unnecessary joining of
data?

Regards,
Akhil


Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?

2017-07-06 Thread ayan guha
I kind of think "Thats the whole point" :) Sorry it is Friday here :) :)

On Fri, Jul 7, 2017 at 1:09 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> It goes through the same optimization pipeline.  More in this video
> <https://youtu.be/1a4pgYzeFwE?t=608>.
>
> On Thu, Jul 6, 2017 at 5:28 PM, kant kodali <kanth...@gmail.com> wrote:
>
>> HI All,
>>
>> I am wondering If I pass a raw SQL string to dataframe do I still get the
>> Spark SQL optimizations? why or why not?
>>
>> Thanks!
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?

2017-07-06 Thread Michael Armbrust
It goes through the same optimization pipeline.  More in this video
<https://youtu.be/1a4pgYzeFwE?t=608>.

On Thu, Jul 6, 2017 at 5:28 PM, kant kodali <kanth...@gmail.com> wrote:

> HI All,
>
> I am wondering If I pass a raw SQL string to dataframe do I still get the
> Spark SQL optimizations? why or why not?
>
> Thanks!
>


If I pass raw SQL string to dataframe do I still get the Spark SQL optimizations?

2017-07-06 Thread kant kodali
HI All,

I am wondering If I pass a raw SQL string to dataframe do I still get the
Spark SQL optimizations? why or why not?

Thanks!


Re: Is there a list of missing optimizations for typed functions?

2017-02-27 Thread lihu
Hi, you can refer to https://issues.apache.org/jira/browse/SPARK-14083 for
more detail.

For performance issue,it is better to using the DataFrame than DataSet API.

On Sat, Feb 25, 2017 at 2:45 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Justin,
>
> I have never seen such a list. I think the area is in heavy development
> esp. optimizations for typed operations.
>
> There's a JIRA to somehow find out more on the behavior of Scala code
> (non-Column-based one from your list) but I've seen no activity in this
> area. That's why for now Column-based untyped queries could be faster due
> to more optimizations applied. Same about UDFs.
>
> Jacek
>
> On 23 Feb 2017 7:52 a.m., "Justin Pihony" <justin.pih...@gmail.com> wrote:
>
>> I was curious if there was introspection of certain typed functions and
>> ran
>> the following two queries:
>>
>> ds.where($"col" > 1).explain
>> ds.filter(_.col > 1).explain
>>
>> And found that the typed function does NOT result in a PushedFilter. I
>> imagine this is due to a limited view of the function, so I have two
>> questions really:
>>
>> 1.) Is there a list of the methods that lose some of the optimizations
>> that
>> you get from non-functional methods? Is it any method that accepts a
>> generic
>> function?
>> 2.) Is there any work to attempt reflection and gain some of these
>> optimizations back? I couldn't find anything in JIRA.
>>
>> Thanks,
>> Justin Pihony
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizatio
>> ns-for-typed-functions-tp28418.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Disable Spark SQL Optimizations for unit tests

2017-02-26 Thread Stefan Ackermann
I found some ways to get faster unit tests.In the meantime they had gone up
to about an hour.

Apparently defining columns in a for loop makes catalyst very slow, as it
blows up the logical plan with many projections:

  final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = {
var df = dfIn
for (toBeCasted <- castToInts) {
  df = df.withColumn(toBeCasted, df(toBeCasted).cast(IntegerType))
}
df
  }

This is much faster:

  final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = {
val columns = dfIn.columns.map { c =>
  if (castToInts.contains(c)) {
dfIn(c).cast(IntegerType)
  } else {
dfIn(c)
  }
}
dfIn.select(columns: _*)
  }

As I consequently applied this to other similar functions the unit tests
went down from 60 to 18 minutes.

Another way to break SQL optimizations was to just save an intermediate
dataframe to HDFS and read from there again. This is quite counter
intuitive, but the unit tests then further went down from 18 minutes to 5.

Is there any other way to add a barrier for catalyst optimizations? As in A
-> B -> C, only optimize A -> B, and B -> C but not the complete A -> C?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380p28426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there a list of missing optimizations for typed functions?

2017-02-24 Thread Jacek Laskowski
Hi Justin,

I have never seen such a list. I think the area is in heavy development
esp. optimizations for typed operations.

There's a JIRA to somehow find out more on the behavior of Scala code
(non-Column-based one from your list) but I've seen no activity in this
area. That's why for now Column-based untyped queries could be faster due
to more optimizations applied. Same about UDFs.

Jacek

On 23 Feb 2017 7:52 a.m., "Justin Pihony" <justin.pih...@gmail.com> wrote:

> I was curious if there was introspection of certain typed functions and ran
> the following two queries:
>
> ds.where($"col" > 1).explain
> ds.filter(_.col > 1).explain
>
> And found that the typed function does NOT result in a PushedFilter. I
> imagine this is due to a limited view of the function, so I have two
> questions really:
>
> 1.) Is there a list of the methods that lose some of the optimizations that
> you get from non-functional methods? Is it any method that accepts a
> generic
> function?
> 2.) Is there any work to attempt reflection and gain some of these
> optimizations back? I couldn't find anything in JIRA.
>
> Thanks,
> Justin Pihony
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-
> functions-tp28418.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Is there a list of missing optimizations for typed functions?

2017-02-22 Thread Justin Pihony
I was curious if there was introspection of certain typed functions and ran
the following two queries:

ds.where($"col" > 1).explain
ds.filter(_.col > 1).explain

And found that the typed function does NOT result in a PushedFilter. I
imagine this is due to a limited view of the function, so I have two
questions really:

1.) Is there a list of the methods that lose some of the optimizations that
you get from non-functional methods? Is it any method that accepts a generic
function?
2.) Is there any work to attempt reflection and gain some of these
optimizations back? I couldn't find anything in JIRA.

Thanks,
Justin Pihony



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-list-of-missing-optimizations-for-typed-functions-tp28418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Disable Spark SQL Optimizations for unit tests

2017-02-11 Thread Stefan Ackermann
Hi,

Can the Spark SQL Optimizations be disabled somehow?

In our project we started 4 weeks ago to write scala / spark / dataframe
code. We currently have only around 10% of the planned project scope, and we
are already waiting 10 (Spark 2.1.0, everything cached) to 30 (Spark 1.6,
nothing cached) minutes for a single unit test run to finish. We have for
example one scala file with maybe 80 lines of code (several joins, several
subtrees reused in different places) that takes up to 6 minutes to be
optimized (the catalyst output is also > 100 Mb). The input for our unit
tests is usually 2 - 3 rows. That is the motivation to disable the optimizer
in unit tests.

I have found this  unanswered SO post
<http://stackoverflow.com/questions/33984152/how-to-speed-up-spark-sql-unit-tests>
 
, but not much more on that topic. I have also found this 
SimpleTestOptimizer
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L151>
  
which sounds perfect, but I have no idea how to instantiate a Spark Session
so it uses that one. 

Does nobody else have this problem? Is there something fundamentally wrong
with our approach?

Regards,
Stefan Ackermann



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Are ser/de optimizations relevant with Dataset API and Encoders ?

2016-06-19 Thread Amit Sela
With RDD API, you could optimize shuffling data by making sure that bytes
are shuffled instead of objects and using the appropriate ser/de mechanism
before and after the shuffle, for example:

Before parallelize, transform to bytes using a dedicated serializer,
parallelize, and immediately after desirialize (happens on the nodes).
The same optimization could be applied in combinePerKey, and when
collecting the data to the driver.

My question: is this relevant with the Dataset API ? Datasets have a
dedicated Encoder and I guess that the binary encoder is less informative
then say Integer/String or general Kryo encoder for Objects, and as a
result will "lose" some optimization abilities.

Is this correct ?

Thanks,
Amit


Spark 1.5.2 - are new Project Tungsten optimizations available on RDD as well?

2016-02-02 Thread Nirav Patel
Hi,

I read about release notes and few slideshares on latest optimizations done
on spark 1.4 and 1.5 releases. Part of which are optimizations from project
Tungsten. Docs says it usues sun.misc.unsafe to convert physical rdd
structure into byte array before shuffle for optimized GC and memory. My
question is why is it only applicable to SQL/Dataframe and not RDD? RDD has
types too!

Thanks,
Nirav

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Optimizations

2015-07-03 Thread Raghavendra Pandey
This is the basic design of spark that it runs all actions in different
stages...
Not sure you can achieve what you r looking for.
On Jul 3, 2015 12:43 PM, Marius Danciu marius.dan...@gmail.com wrote:

 Hi all,

 If I have something like:

 rdd.join(...).mapPartitionToPair(...)

 It looks like mapPartitionToPair runs in a different stage then join. Is
 there a way to piggyback this computation inside the join stage ? ... such
 that each result partition after join is passed to
 the mapPartitionToPair function, all running in the same state without any
 other costs.

 Best,
 Marius



Re: Optimizations

2015-07-03 Thread Marius Danciu
Thanks for your feedback. Yes I am aware of stages design and Silvio what
you are describing is essentially map-side join which is not applicable
when you have both RDDs quite large.

It appears that

rdd.join(...).mapToPair(f)
f is piggybacked inside join stage  (right in the reducers I believe)

whereas

rdd.join(...).mapPartitionToPair( f )

f is executed in a different stage. This is surprising because at least
intuitively the difference between mapToPair and mapPartitionToPair is that
that former is about the push model whereas the latter is about polling
records out of the iterator (*I suspect there are other technical reasons*).
If anyone know the depths of the problem if would be of great help.

Best,
Marius

On Fri, Jul 3, 2015 at 6:43 PM Silvio Fiorito silvio.fior...@granturing.com
wrote:

   One thing you could do is a broadcast join. You take your smaller RDD,
 save it as a broadcast variable. Then run a map operation to perform the
 join and whatever else you need to do. This will remove a shuffle stage but
 you will still have to collect the joined RDD and broadcast it. All depends
 on the size of your data if it’s worth it or not.

   From: Marius Danciu
 Date: Friday, July 3, 2015 at 3:13 AM
 To: user
 Subject: Optimizations

   Hi all,

  If I have something like:

  rdd.join(...).mapPartitionToPair(...)

  It looks like mapPartitionToPair runs in a different stage then join. Is
 there a way to piggyback this computation inside the join stage ? ... such
 that each result partition after join is passed to
 the mapPartitionToPair function, all running in the same state without any
 other costs.

  Best,
 Marius



Optimizations

2015-07-03 Thread Marius Danciu
Hi all,

If I have something like:

rdd.join(...).mapPartitionToPair(...)

It looks like mapPartitionToPair runs in a different stage then join. Is
there a way to piggyback this computation inside the join stage ? ... such
that each result partition after join is passed to
the mapPartitionToPair function, all running in the same state without any
other costs.

Best,
Marius


Delayed hotspot optimizations in Spark

2014-10-10 Thread Alexey Romanchuk
Hello spark users and developers!

I am using hdfs + spark sql + hive schema + parquet as storage format. I
have lot of parquet files - one files fits one hdfs block for one day. The
strange thing is very slow first query for spark sql.

To reproduce situation I use only one core and I have 97sec for first time
and only 13sec for all next queries. Sure I query for different data, but
it has same structure and size. The situation can be reproduced after
restart thrift server.

Here it information about parquet files reading from worker node:

Slow one:
Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1560251 records from 30 columns in 11686 ms:
133.51454 rec/ms, 4005.4363 cell/ms

Fast one:
Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 1 columns in 1373 ms:
1142.6796 rec/ms, 1142.6796 cell/ms

As you can see second reading is 10x times faster then first. Most of the
query time spent to work with parquet file.

This problem is really annoying, because most of my spark task contains
just 1 sql query and data processing and to speedup my jobs I put special
warmup query in from of any job.

My assumption is that it is hotspot optimizations that used due first
reading. Do you have any idea how to confirm/solve this performance problem?

Thanks for advice!

p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation
but can not figure out what are important and what are not.


Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Sean Owen
You could try setting -Xcomp for executors to force JIT compilation
upfront. I don't know if it's a good idea overall but might show
whether the upfront compilation really helps. I doubt it.

However is this almost surely due to caching somewhere, in Spark SQL
or HDFS? I really doubt hotspot makes a difference compared to these
much larger factors.

On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk
alexey.romanc...@gmail.com wrote:
 Hello spark users and developers!

 I am using hdfs + spark sql + hive schema + parquet as storage format. I
 have lot of parquet files - one files fits one hdfs block for one day. The
 strange thing is very slow first query for spark sql.

 To reproduce situation I use only one core and I have 97sec for first time
 and only 13sec for all next queries. Sure I query for different data, but it
 has same structure and size. The situation can be reproduced after restart
 thrift server.

 Here it information about parquet files reading from worker node:

 Slow one:
 Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
 Assembled and processed 1560251 records from 30 columns in 11686 ms:
 133.51454 rec/ms, 4005.4363 cell/ms

 Fast one:
 Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
 Assembled and processed 1568899 records from 1 columns in 1373 ms: 1142.6796
 rec/ms, 1142.6796 cell/ms

 As you can see second reading is 10x times faster then first. Most of the
 query time spent to work with parquet file.

 This problem is really annoying, because most of my spark task contains just
 1 sql query and data processing and to speedup my jobs I put special warmup
 query in from of any job.

 My assumption is that it is hotspot optimizations that used due first
 reading. Do you have any idea how to confirm/solve this performance problem?

 Thanks for advice!

 p.s. I have billion hotspot optimization showed with -XX:+PrintCompilation
 but can not figure out what are important and what are not.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Alexey Romanchuk
Hey Sean and spark users!

Thanks for reply. I try -Xcomp right now and start time was about few
minutes (as expected), but I got first query slow as before:
Oct 10, 2014 3:03:41 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 30 columns in 12897 ms:
121.64837 rec/ms, 3649.451 cell/ms

and next

Oct 10, 2014 3:05:03 PM INFO: parquet.hadoop.InternalParquetRecordReader:
Assembled and processed 1568899 records from 1 columns in 1757 ms:
892.94196 rec/ms, 892.94196 cell/ms

I have no idea about caching or other stuff because CPU load is 100% on
worker and jstack show that worker is reading from parquet file.

Any ideas?

Thanks!

On Fri, Oct 10, 2014 at 2:55 PM, Sean Owen so...@cloudera.com wrote:

 You could try setting -Xcomp for executors to force JIT compilation
 upfront. I don't know if it's a good idea overall but might show
 whether the upfront compilation really helps. I doubt it.

 However is this almost surely due to caching somewhere, in Spark SQL
 or HDFS? I really doubt hotspot makes a difference compared to these
 much larger factors.

 On Fri, Oct 10, 2014 at 8:49 AM, Alexey Romanchuk
 alexey.romanc...@gmail.com wrote:
  Hello spark users and developers!
 
  I am using hdfs + spark sql + hive schema + parquet as storage format. I
  have lot of parquet files - one files fits one hdfs block for one day.
 The
  strange thing is very slow first query for spark sql.
 
  To reproduce situation I use only one core and I have 97sec for first
 time
  and only 13sec for all next queries. Sure I query for different data,
 but it
  has same structure and size. The situation can be reproduced after
 restart
  thrift server.
 
  Here it information about parquet files reading from worker node:
 
  Slow one:
  Oct 10, 2014 2:26:53 PM INFO: parquet.hadoop.InternalParquetRecordReader:
  Assembled and processed 1560251 records from 30 columns in 11686 ms:
  133.51454 rec/ms, 4005.4363 cell/ms
 
  Fast one:
  Oct 10, 2014 2:31:30 PM INFO: parquet.hadoop.InternalParquetRecordReader:
  Assembled and processed 1568899 records from 1 columns in 1373 ms:
 1142.6796
  rec/ms, 1142.6796 cell/ms
 
  As you can see second reading is 10x times faster then first. Most of the
  query time spent to work with parquet file.
 
  This problem is really annoying, because most of my spark task contains
 just
  1 sql query and data processing and to speedup my jobs I put special
 warmup
  query in from of any job.
 
  My assumption is that it is hotspot optimizations that used due first
  reading. Do you have any idea how to confirm/solve this performance
 problem?
 
  Thanks for advice!
 
  p.s. I have billion hotspot optimization showed with
 -XX:+PrintCompilation
  but can not figure out what are important and what are not.



Re: Delayed hotspot optimizations in Spark

2014-10-10 Thread Guillaume Pitel

Hi

Could it be due to GC ? I read it may happen if your program starts with 
a small heap. What are your -Xms and -Xmx values ?


Print GC stats with -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

Guillaume

Hello spark users and developers!

I am using hdfs + spark sql + hive schema + parquet as storage format. 
I have lot of parquet files - one files fits one hdfs block for one 
day. The strange thing is very slow first query for spark sql.


To reproduce situation I use only one core and I have 97sec for first 
time and only 13sec for all next queries. Sure I query for different 
data, but it has same structure and size. The situation can be 
reproduced after restart thrift server.


Here it information about parquet files reading from worker node:

Slow one:
Oct 10, 2014 2:26:53 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1560251 records from 30 columns in 11686 ms: 133.51454 rec/ms, 
4005.4363 cell/ms


Fast one:
Oct 10, 2014 2:31:30 PM INFO: 
parquet.hadoop.InternalParquetRecordReader: Assembled and processed 
1568899 records from 1 columns in 1373 ms: 1142.6796 rec/ms, 1142.6796 
cell/ms


As you can see second reading is 10x times faster then first. Most of 
the query time spent to work with parquet file.


This problem is really annoying, because most of my spark task 
contains just 1 sql query and data processing and to speedup my jobs I 
put special warmup query in from of any job.


My assumption is that it is hotspot optimizations that used due first 
reading. Do you have any idea how to confirm/solve this performance 
problem?


Thanks for advice!

p.s. I have billion hotspot optimization showed 
with -XX:+PrintCompilation but can not figure out what are important 
and what are not.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org