Repository: spark Updated Branches: refs/heads/master 07737c87d -> 4a9c9d8f9
[SPARK-25159][SQL] json schema inference should only trigger one job ## What changes were proposed in this pull request? This fixes a perf regression caused by https://github.com/apache/spark/pull/21376 . We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions. To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in `JsonInferSchema`. ## How was this patch tested? a new test Closes #22152 from cloud-fan/conf. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a9c9d8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a9c9d8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a9c9d8f Branch: refs/heads/master Commit: 4a9c9d8f9a8f8f165369e121d3b553a3515333d4 Parents: 07737c8 Author: Wenchen Fan <wenc...@databricks.com> Authored: Tue Aug 21 22:21:08 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Tue Aug 21 22:21:08 2018 -0700 ---------------------------------------------------------------------- .../sql/catalyst/json/JsonInferSchema.scala | 16 +++++++--- .../org/apache/spark/sql/internal/SQLConf.scala | 33 ++++++++++++++++---- .../org/apache/spark/sql/DataFrameSuite.scala | 24 ++++++++++++++ 3 files changed, 63 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 5f70e06..9999a00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } - // Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because - // `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have - // active SparkSession and `SQLConf.get` may point to the wrong configs. - val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) + // Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running + // the fold functions in the scheduler event loop thread. + val existingConf = SQLConf.get + var rootType: DataType = StructType(Nil) + val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) + val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { + typeMerger(rootType, taskResult) + } + } + json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) canonicalizeType(rootType, configOptions) match { case Some(st: StructType) => st http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5913c94..df2caff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -82,6 +82,19 @@ object SQLConf { /** See [[get]] for more information. */ def getFallbackConf: SQLConf = fallbackConf.get() + private lazy val existingConf = new ThreadLocal[SQLConf] { + override def initialValue: SQLConf = null + } + + def withExistingConf[T](conf: SQLConf)(f: => T): T = { + existingConf.set(conf) + try { + f + } finally { + existingConf.remove() + } + } + /** * Defines a getter that returns the SQLConf within scope. * See [[get]] for more information. @@ -116,16 +129,24 @@ object SQLConf { if (TaskContext.get != null) { new ReadOnlySQLConf(TaskContext.get()) } else { - if (Utils.isTesting && SparkContext.getActive.isDefined) { + val isSchedulerEventLoopThread = SparkContext.getActive + .map(_.dagScheduler.eventProcessLoop.eventThread) + .exists(_.getId == Thread.currentThread().getId) + if (isSchedulerEventLoopThread) { // DAGScheduler event loop thread does not have an active SparkSession, the `confGetter` - // will return `fallbackConf` which is unexpected. Here we prevent it from happening. - val schedulerEventLoopThread = - SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread - if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { + // will return `fallbackConf` which is unexpected. Here we require the caller to get the + // conf within `withExistingConf`, otherwise fail the query. + val conf = existingConf.get() + if (conf != null) { + conf + } else if (Utils.isTesting) { throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") + } else { + confGetter.get()() } + } else { + confGetter.get()() } - confGetter.get()() } } http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b0e22a5..7310087 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -27,6 +27,7 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union} @@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(aggPlusFilter1, aggPlusFilter2.collect()) } } + + test("SPARK-25159: json schema inference should only trigger one job") { + withTempPath { path => + // This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which + // triggers one Spark job per RDD partition. + Seq(1 -> "a", 2 -> "b").toDF("i", "p") + // The data set has 2 partitions, so Spark will write at least 2 json files. + // Use a non-splittable compression (gzip), to make sure the json scan RDD has at least 2 + // partitions. + .write.partitionBy("p").option("compression", "gzip").json(path.getCanonicalPath) + + var numJobs = 0 + sparkContext.addSparkListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + numJobs += 1 + } + }) + + val df = spark.read.json(path.getCanonicalPath) + assert(df.columns === Array("i", "p")) + assert(numJobs == 1) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org