GitHub user rajeshbalamohan opened a pull request:
https://github.com/apache/spark/pull/11978
SPARK-14113. Consider marking JobConf closure-cleaning in HadoopRDD aâ¦
## What changes were proposed in this pull request?
In HadoopRDD, the following code was introduced as a part of SPARK-6943.
``
if (initLocalJobConfFuncOpt.isDefined) {
sparkContext.clean(initLocalJobConfFuncOpt.get)
}
``
Passing initLocalJobConfFuncOpt to HadoopRDD incurs good performance
penalty (due to closure cleaning) with large number of RDDs. This would be
invoked for every HadoopRDD initialization causing the bottleneck.
example threadstack is given below
``
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.readUTF8(Unknown Source)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:402)
at
org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:390)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:102)
at
scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:102)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at
scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:102)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:390)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$15.apply(ClosureCleaner.scala:224)
at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$15.apply(ClosureCleaner.scala:223)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:223)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2079)
at org.apache.spark.rdd.HadoopRDD.<init>(HadoopRDD.scala:112)
``
This PR request does the following
1. Remove the closure cleaning in HadoopRDD init, which was mainly added to
check if HadoopRDD can be made serializable or not.
2. Directly instantiate HadoopRDD in OrcRelation, instead of going via
SparkContext.hadoopRDD (which internally invokes threaddump in "withScope").
Clubbing this change instead of making a separate ticket for this minor change.
## How was this patch tested?
No new tests have been added. Used the following code to measure overhead
the HadoopRDD init codepath. With patch it took 340ms as opposed to 4815ms
without patch.
Also tested with number of queries from TPC-DS in multi node environment.
Along with, ran the following unit tests
org.apache.spark.sql.hive.execution.HiveCompatibilitySuite,org.apache.spark.sql.hive.execution.HiveQuerySuite,org.apache.spark.sql.hive.execution.PruningSuite,org.apache.spark.sql.hive.CachedTableSuite,org.apache.spark.rdd.RDDOperationScopeSuite,org.apache.spark.ui.jobs.JobProgressListenerSuite
``
test("Check timing for HadoopRDD init") {
val start: Long = System.currentTimeMillis();
val initializeJobConfFunc =
HadoopTableReader.initializeLocalJobConfFunc ("", null) _
Utils.withDummyCallSite(sqlContext.sparkContext) {
// Large tables end up creating 5500 RDDs
for(i <- 1 to 5500) {
// ignore nulls in RDD as its mainly for testing timing of RDD
creation
val testRDD = new HadoopRDD(sqlContext.sparkContext, null,
Some(initializeJobConfFunc),
null, classOf[NullWritable], classOf[Writable], 10)
}
}
val end: Long = System.currentTimeMillis();
println("Time taken : " + (end - start))
}
``
Without Patch: (time taken to init 5000 HadoopRDD)
Time taken : 4815
Without Patch: (time taken to init 5000 HadoopRDD)
Time taken : 340
â¦s optional
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rajeshbalamohan/spark SPARK-14113
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/11978.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #11978
----
commit dfb6b03c5061dd8514fe09804c30c9281af50ab9
Author: Rajesh Balamohan <[email protected]>
Date: 2016-03-26T08:58:17Z
SPARK-14113. Consider marking JobConf closure-cleaning in HadoopRDD as
optional
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]