dtenedor commented on code in PR #41191: URL: https://github.com/apache/spark/pull/41191#discussion_r1212368577
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * This rule is responsible for matching against unresolved table specifications in commands with + * OPTIONS lists. The parser produces such lists as maps from strings to unresolved expressions. + * After otherwise resolving such expressions in the analyzer, this rule converts them to resolved + * table specifications wherein these OPTIONS list values are represented as strings instead, for + * convenience. The 'resolveOption' is for resolving function calls within each table option. + */ +case class ResolveTableSpec(resolveOption: Expression => Expression) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsWithPruning(_.containsAnyPattern(COMMAND), ruleId) { + case t: CreateTable if t.tableSpec.isInstanceOf[UnresolvedTableSpec] => + t.copy(tableSpec = resolveTableSpec(t.tableSpec)) + case t: CreateTableAsSelect if t.tableSpec.isInstanceOf[UnresolvedTableSpec] => + t.copy(tableSpec = resolveTableSpec(t.tableSpec)) + case t: ReplaceTable if t.tableSpec.isInstanceOf[UnresolvedTableSpec] => + t.copy(tableSpec = resolveTableSpec(t.tableSpec)) + case t: ReplaceTableAsSelect if t.tableSpec.isInstanceOf[UnresolvedTableSpec] => + t.copy(tableSpec = resolveTableSpec(t.tableSpec)) + } + } + + private def resolveTableSpec(t: TableSpec): TableSpec = { + val u = t.asInstanceOf[UnresolvedTableSpec] + val newOptions: Map[String, String] = u.optionsExpressions.map { + case (key: String, null) => + (key, null) + case (key: String, value: Expression) => + val newValue: String = try { + resolveOption(value) match { + case lit: Literal => Review Comment: Sure, the `catch` block checked for `MatchError`, but I added a `case _` to make this explicit. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -2406,6 +2407,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } + /** Helper method to resolve function calls within expressions of TableSpec options. */ + def ResolveTableSpecOption(expression: Expression): Expression = { Review Comment: 👍 I was able to remove this entirely, and `ResolveTableSpecOption` is no longer a separate analyzer rule. Instead, `ResolveSessionCatalog` now simply just resolves the `TableSpec` when needed directly. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -2406,6 +2407,15 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } } + /** Helper method to resolve function calls within expressions of TableSpec options. */ + def ResolveTableSpecOption(expression: Expression): Expression = { + val analyzed = executeSameContext(Project(Seq(Alias(expression, "col")()), OneRowRelation())) + val folded = ConstantFolding(analyzed) + folded match { + case Project(Seq(Alias(expression, _)), _) => expression Review Comment: Good Q: now that we just apply constant-folding here, we do not need to check if the pattern does not match, because the constant folding we invoked will not change the operators and number of projection expressions in the query plan. I added a comment to this effect here. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala: ########## @@ -1382,11 +1383,64 @@ case class DropIndex( copy(table = newChild) } -case class TableSpec( +trait TableSpec { + def properties: Map[String, String] + def provider: Option[String] + def options: Map[String, String] + def location: Option[String] + def comment: Option[String] + def serde: Option[SerdeInfo] + def external: Boolean + def copy( + properties: Map[String, String] = properties, + provider: Option[String] = provider, + options: Map[String, String] = options, + location: Option[String] = location, + comment: Option[String] = comment, + serde: Option[SerdeInfo] = serde, + external: Boolean = external): TableSpec +} + +case class UnresolvedTableSpec( + properties: Map[String, String], + provider: Option[String], + optionsExpressions: Map[String, Expression], + location: Option[String], + comment: Option[String], + serde: Option[SerdeInfo], + external: Boolean) extends TableSpec { + override def options: Map[String, String] = { + throw SparkException.internalError("Invalid call to 'options' method for unresolved TableSpec") + } + override def copy( + properties: Map[String, String], + provider: Option[String], + options: Map[String, String], + location: Option[String], + comment: Option[String], + serde: Option[SerdeInfo], + external: Boolean = external): TableSpec = { + UnresolvedTableSpec( + properties, provider, optionsExpressions, location, comment, serde, external) + } +} + +case class ResolvedTableSpec( properties: Map[String, String], provider: Option[String], options: Map[String, String], location: Option[String], comment: Option[String], serde: Option[SerdeInfo], - external: Boolean) + external: Boolean) extends TableSpec { + override def copy( Review Comment: I only added to copy the `TableSpec` with a new `location`. I switched this to just `withNewLocation` instead to simplify. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala: ########## @@ -927,9 +927,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map.asCaseSensitiveMap().asScala, maxFields) case map: Map[_, _] => redactMapString(map, maxFields) - case t: TableSpec => - t.copy(properties = Utils.redact(t.properties).toMap, - options = Utils.redact(t.options).toMap) :: Nil + case t: UnresolvedTableSpec => + val newOptionsList = Utils.redact(t.optionsExpressions.map { case (key, value) => + (key, value.sql) + }).toMap + ResolvedTableSpec( Review Comment: I updated this to keep the object as an `UnresolvedTableSpec` here instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
