Hari,
Thanks for the details and sorry for the late reply. Currently Spark SQL
doesn’t enable broadcast join optimization for left outer join, thus
shuffles are required to perform this query. I made a quite artificial
test to show the physical plan of your query:
|== Physical Plan ==
HashOuterJoin [state#15], [state#19], LeftOuter, None
Exchange (HashPartitioning [state#15], 200)
PhysicalRDD [state#15,city#16,amount#17,amount2#18], MapPartitionsRDD[1] at
mapPartitions at ExistingRDD.scala:36
Aggregate false, [state#19], [state#19,MAX(PartialMax#24) AS amount1#4]
Exchange (HashPartitioning [state#19], 200)
Aggregate true, [state#19], [state#19,MAX(amount2#22) AS PartialMax#24]
Project [state#19,amount2#22]
PhysicalRDD [state#19,city#20,amount#21,amount2#22], MapPartitionsRDD[1]
at mapPartitions at ExistingRDD.scala:36
|
For each |Exchange| operator, a shuffle is inserted. This partly causes
low performance. On the other hand, the default shuffle partition number
is 200, which is apparent too large for only 30K rows, and introduces
unnecessary task scheduling costs. You may try to lower the shuffle
number to, for example, 8.
Also, PR #3270 <https://github.com/apache/spark/pull/3270> is part of
the attempt to accelerate similar queries.
Cheng
On 12/18/14 10:41 PM, Hari Rajaram wrote:
Cheng,
Thanks for looking at the issue.As I said earlier,it is a schemaRDD
created from case class by reading a tab delimted file.
I'm using DSL to join to the RDD's.
Just a small snippet
RDD1:
case classRDD1 (state:String,city:String,amount:Double,amount2:Double)
RDD2:
variable results used below is nothing but schemaRDD from RDD1.
valgroupByRDD
=results.groupBy('state)('state,(Alias(Max('amount2),"amount1")()))
valx =results.as <http://results.as>('x)
valoriginalTableColumns = x.schema.fieldNames
valy = groupByRDD.as('y)
val joinOnClause='x.state='y.state
valjoinRDD = x.join(y,LeftOuter,Some(joinOnClause))
Get the records from joinRDD.
Note:results(RDD1) is already created and cached..So the time from
groupByRDD to joinRDD is around 8 to 10 secs.
Hari
On Wed, Dec 17, 2014 at 10:09 PM, Cheng Lian <lian.cs....@gmail.com
<mailto:lian.cs....@gmail.com>> wrote:
What kinds are the tables underlying the SchemaRDDs? Could you
please provide the DDL of the tables and the query you executed?
On 12/18/14 6:15 AM, harirajaram wrote:
Guys,
I'm trying to join 2-3 schemaRDD's for approx 30,000 rows and
it is terribly
slow.No doubt I get the results but it takes 8s to do the
join and get the
results.
I'm running on a standalone spark in my m/c having 8 cores and
12gb RAM with
4 workers.
Not sure why it is consuming time,any inputs appreciated..
This is just an e.g on what I'm trying to say.
RDD1(30,000 rows)
state,city,amount
RDD2 (50 rows)
state,amount1
join by state
New RDD3:(30,000 rows)
state,city,amount,amount1
Do a select(amount-amount1) from New RDD3.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>