Repository: spark Updated Branches: refs/heads/master bc6c56e94 -> 3b19c74e7
[SPARK-19157][SQL] should be able to change spark.sql.runSQLOnFiles at runtime ## What changes were proposed in this pull request? The analyzer rule that supports to query files directly will be added to `Analyzer.extendedResolutionRules` when SparkSession is created, according to the `spark.sql.runSQLOnFiles` flag. If the flag is off when we create `SparkSession`, this rule is not added and we can not query files directly even we turn on the flag later. This PR fixes this bug by always adding that rule to `Analyzer.extendedResolutionRules`. ## How was this patch tested? new regression test Author: Wenchen Fan <wenc...@databricks.com> Closes #16531 from cloud-fan/sql-on-files. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b19c74e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b19c74e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b19c74e Branch: refs/heads/master Commit: 3b19c74e71fd6af18047747843e962b5401db4d9 Parents: bc6c56e Author: Wenchen Fan <wenc...@databricks.com> Authored: Tue Jan 10 21:33:44 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Jan 10 21:33:44 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/datasources/rules.scala | 21 ++++++++++-------- .../spark/sql/internal/SessionState.scala | 2 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 23 ++++++++++++++++++++ .../spark/sql/hive/HiveSessionState.scala | 2 +- 4 files changed, 37 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 94ba814..5ca8226 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -35,27 +35,30 @@ import org.apache.spark.sql.types.{AtomicType, StructType} * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]]. */ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { + private def maybeSQLFile(u: UnresolvedRelation): Boolean = { + sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case u: UnresolvedRelation if u.tableIdentifier.database.isDefined => + case u: UnresolvedRelation if maybeSQLFile(u) => try { val dataSource = DataSource( sparkSession, paths = u.tableIdentifier.table :: Nil, className = u.tableIdentifier.database.get) - val notSupportDirectQuery = try { - !classOf[FileFormat].isAssignableFrom(dataSource.providingClass) - } catch { - case NonFatal(e) => false - } - if (notSupportDirectQuery) { + // `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch + // will catch it and return the original plan, so that the analyzer can report table not + // found later. + val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + if (!isFileFormat) { throw new AnalysisException("Unsupported data source type for direct query on files: " + s"${u.tableIdentifier.database.get}") } val plan = LogicalRelation(dataSource.resolveRelation()) - u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan) + u.alias.map(a => SubqueryAlias(a, plan, None)).getOrElse(plan) } catch { - case e: ClassNotFoundException => u + case _: ClassNotFoundException => u case e: Exception => // the provider is valid, but failed to create a logical plan u.failAnalysis(e.getMessage) http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 8759dfe..c9075ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -117,7 +117,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { PreprocessTableInsertion(conf) :: new FindDataSourceTable(sparkSession) :: DataSourceAnalysis(conf) :: - (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) + new ResolveDataSource(sparkSession) :: Nil override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck) http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e89599b..563d068 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2476,4 +2476,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(sql("SELECT * FROM array_tbl where arr = ARRAY(1L)").count == 1) } } + + test("SPARK-19157: should be able to change spark.sql.runSQLOnFiles at runtime") { + withTempPath { path => + Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath) + + val newSession = spark.newSession() + val originalValue = newSession.sessionState.conf.runSQLonFile + + try { + newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false) + intercept[AnalysisException] { + newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`") + } + + newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true) + checkAnswer( + newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"), + Row(1, "a")) + } finally { + newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue) + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 52892f1..aebee85 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -65,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) PreprocessTableInsertion(conf) :: DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: - (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) + new ResolveDataSource(sparkSession) :: Nil override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org