GitHub user rajeshbalamohan opened a pull request:

    https://github.com/apache/spark/pull/11978

    SPARK-14113. Consider marking JobConf closure-cleaning in HadoopRDD a…

    ## What changes were proposed in this pull request?
    
    In HadoopRDD, the following code was introduced as a part of SPARK-6943.
    
    ``
      if (initLocalJobConfFuncOpt.isDefined) {
        sparkContext.clean(initLocalJobConfFuncOpt.get)
      }
    ``
    
    Passing initLocalJobConfFuncOpt to HadoopRDD incurs good performance 
penalty (due to closure cleaning) with large number of RDDs. This would be 
invoked for every HadoopRDD initialization causing the bottleneck.
    
    example threadstack is given below
    
    ``
            at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.readUTF8(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
            at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:402)
            at 
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:390)
            at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
            at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:102)
            at 
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:102)
            at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
            at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
            at 
scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:102)
            at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
            at 
org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:390)
            at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
            at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
            at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$15.apply(ClosureCleaner.scala:224)
            at 
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$15.apply(ClosureCleaner.scala:223)
            at scala.collection.immutable.List.foreach(List.scala:318)
            at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:223)
            at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
            at org.apache.spark.SparkContext.clean(SparkContext.scala:2079)
            at org.apache.spark.rdd.HadoopRDD.<init>(HadoopRDD.scala:112)
    ``
    
    This PR request does the following
    
    1. Remove the closure cleaning in HadoopRDD init, which was mainly added to 
check if HadoopRDD can be made serializable or not.
    2. Directly instantiate HadoopRDD in OrcRelation, instead of going via 
SparkContext.hadoopRDD (which internally invokes threaddump in "withScope"). 
Clubbing this change instead of making a separate ticket for this minor change.
    
    
    ## How was this patch tested?
    
    No new tests have been added. Used the following code to measure overhead 
the HadoopRDD init codepath. With patch it took 340ms as opposed to 4815ms 
without patch.
    
    Also tested with number of queries from TPC-DS in multi node environment. 
Along with, ran the following unit tests 
org.apache.spark.sql.hive.execution.HiveCompatibilitySuite,org.apache.spark.sql.hive.execution.HiveQuerySuite,org.apache.spark.sql.hive.execution.PruningSuite,org.apache.spark.sql.hive.CachedTableSuite,org.apache.spark.rdd.RDDOperationScopeSuite,org.apache.spark.ui.jobs.JobProgressListenerSuite
    
    ``
      test("Check timing for HadoopRDD init") {
        val start: Long = System.currentTimeMillis();
    
        val initializeJobConfFunc = 
HadoopTableReader.initializeLocalJobConfFunc ("", null) _
        Utils.withDummyCallSite(sqlContext.sparkContext) {
          // Large tables end up creating 5500 RDDs
          for(i <- 1 to 5500) {
            // ignore nulls in RDD as its mainly for testing timing of RDD 
creation
            val testRDD = new HadoopRDD(sqlContext.sparkContext, null, 
Some(initializeJobConfFunc),
              null, classOf[NullWritable], classOf[Writable], 10)
          }
        }
        val end: Long = System.currentTimeMillis();
        println("Time taken : " + (end - start))
      }
    ``
    
    Without Patch: (time taken to init 5000 HadoopRDD)
    Time taken : 4815         
    
    Without Patch: (time taken to init 5000 HadoopRDD)
    Time taken : 340
    
    
    …s optional

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rajeshbalamohan/spark SPARK-14113

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11978.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11978
    
----
commit dfb6b03c5061dd8514fe09804c30c9281af50ab9
Author: Rajesh Balamohan <[email protected]>
Date:   2016-03-26T08:58:17Z

    SPARK-14113. Consider marking JobConf closure-cleaning in HadoopRDD as 
optional

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to