Hi,
I am trying to understand the query plan and number of tasks /execution time
created for joined query.
Consider following example , creating two tables emp, sal with appropriate 100
records in each table with key for joining them.
EmpRDDRelation.scala
case class EmpRecord(key: Int, value: String)
case class SalRecord(key: Int, salary: Int)
object EmpRDDRelation {
def main(args: Array[String]) {
val sparkConf = new
SparkConf().setMaster("local[1]").setAppName("RDDRelation")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// Importing the SQL context gives access to all the SQL functions and
implicit conversions.
import sqlContext._
var rdd= sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
rdd.registerAsTable("emp")
// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
sql("SELECT * FROM emp").collect().foreach(println)
var salrdd = sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
salrdd.registerAsTable("sal")
sql("SELECT * FROM sal").collect().foreach(println)
var salRRDFromSQL= sql("SELECT emp.key,value,salary from emp,sal WHERE
emp.key=30 AND emp.key=sal.key")
salRRDFromSQL.collect().foreach(println)
}
}
Here are my observation :
Below is query plan for above join query which creates 150 tasks. I could see
Filter is added in the plan , but not sure whether taken in optimized way.
First of all it is not clear why 150 tasks are required, because i could see
similar 150 tasks when executed the above join query without filter
"emp.key=30" like "SELECT emp.key,value,salary from emp,sal WHERE
emp.key=sal.key" and took same time for both cases. So my understanding emp.key
=30 filter should take place first and on top of the filtered records from emp
table it should join with sal table( From the Oracle RDBMS perspective) . But
here query plan joins tables first and applies filter later. Is there anyway
we can improve it from code wise or does require enhancement from Spark SQL
side.
Please review my observation and let me know your comments.
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
HashJoin [key#0], [key#2], BuildRight
Exchange (HashPartitioning [key#0:0], 150)
Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
basicOperators.scala:174
Exchange (HashPartitioning [key#2:0], 150)
ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
basicOperators.scala:174), which is now runnable
14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2
(SchemaRDD[8] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
HashJoin [key#0], [key#2], BuildRight
Exchange (HashPartitioning [key#0:0], 150)
Filter (key#0:0 = 30)
ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at
basicOperators.scala:174
Exchange (HashPartitioning [key#2:0], 150)
ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at
basicOperators.scala:174)
14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks