Hi,

I am trying to understand the query plan and number of tasks /execution time 
created for joined query.    

Consider following example , creating two tables emp, sal with appropriate 100 
records in each table with key for joining them.

EmpRDDRelation.scala

case class EmpRecord(key: Int, value: String)
case class SalRecord(key: Int, salary: Int)

object EmpRDDRelation {
  def main(args: Array[String]) {
    val sparkConf = new 
SparkConf().setMaster("local[1]").setAppName("RDDRelation")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    // Importing the SQL context gives access to all the SQL functions and 
implicit conversions.
    import sqlContext._
    
    var rdd= sc.parallelize((1 to 100 ).map(i=>EmpRecord(i, s"name_$i")))
 
    rdd.registerAsTable("emp")

    // Once tables have been registered, you can run SQL queries over them.
    println("Result of SELECT *:")
    sql("SELECT * FROM emp").collect().foreach(println)
    
    
    var salrdd = sc.parallelize((1 to 100).map(i=>SalRecord(i,i*100)))
   
    salrdd.registerAsTable("sal")
     sql("SELECT * FROM sal").collect().foreach(println)
     
    var salRRDFromSQL= sql("SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=30 AND emp.key=sal.key")
    salRRDFromSQL.collect().foreach(println)
    
   
  }
}

Here are my observation :

Below is query plan for above join query which creates 150 tasks. I could see 
Filter is added in the plan , but not sure whether taken in optimized way. 
First of all it is not clear why 150 tasks are required, because i could see 
similar 150 tasks when executed the above join query without filter 
"emp.key=30" like "SELECT emp.key,value,salary from emp,sal WHERE  
emp.key=sal.key" and took same time for both cases. So my understanding emp.key 
=30 filter should take place first and on top of the filtered records from emp 
table it should join with sal table( From the Oracle RDBMS perspective) .  But 
here query plan joins tables first  and applies filter later.  Is there anyway 
we can improve it from code wise or does require enhancement from Spark SQL 
side.

Please review my observation and let me know your comments.


== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
    ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174), which is now runnable
14/08/01 22:20:02 INFO DAGScheduler: Submitting 150 missing tasks from Stage 2 
(SchemaRDD[8] at RDD at SchemaRDD.scala:98
== Query Plan ==
Project [key#0:0,value#1:1,salary#3:3]
 HashJoin [key#0], [key#2], BuildRight
  Exchange (HashPartitioning [key#0:0], 150)
   Filter (key#0:0 = 30)
    ExistingRdd [key#0,value#1], MapPartitionsRDD[1] at mapPartitions at 
basicOperators.scala:174
  Exchange (HashPartitioning [key#2:0], 150)
   ExistingRdd [key#2,salary#3], MapPartitionsRDD[5] at mapPartitions at 
basicOperators.scala:174)
14/08/01 22:20:02 INFO TaskSchedulerImpl: Adding task set 2.0 with 150 tasks


                                          

Reply via email to