TeodorDjelic commented on code in PR #51705: URL: https://github.com/apache/spark/pull/51705#discussion_r2260804620
########## sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/CreateVariablesExec.scala: ########## @@ -23,45 +23,51 @@ import org.apache.spark.sql.catalyst.{InternalRow, SqlScriptingContextManager} import org.apache.spark.sql.catalyst.analysis.{FakeLocalCatalog, ResolvedIdentifier} import org.apache.spark.sql.catalyst.catalog.VariableDefinition import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionsEvaluator, Literal} -import org.apache.spark.sql.catalyst.plans.logical.DefaultValueExpression +import org.apache.spark.sql.catalyst.plans.logical.{DefaultValueExpression, LogicalPlan} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec /** * Physical plan node for creating a variable. */ -case class CreateVariableExec( - resolvedIdentifier: ResolvedIdentifier, +case class CreateVariablesExec( + logicalPlansOfResolvedIdentifiers: IndexedSeq[LogicalPlan], defaultExpr: DefaultValueExpression, replace: Boolean) extends LeafV2CommandExec with ExpressionsEvaluator { override protected def run(): Seq[InternalRow] = { + + val resolvedIdentifiers = + logicalPlansOfResolvedIdentifiers.map(_.asInstanceOf[ResolvedIdentifier]) + val scriptingVariableManager = SqlScriptingContextManager.get().map(_.getVariableManager) val tempVariableManager = session.sessionState.catalogManager.tempVariableManager val exprs = prepareExpressions(Seq(defaultExpr.child), subExprEliminationEnabled = false) initializeExprs(exprs, 0) val initValue = Literal(exprs.head.eval(), defaultExpr.dataType) - val normalizedIdentifier = if (session.sessionState.conf.caseSensitiveAnalysis) { - resolvedIdentifier.identifier - } else { - Identifier.of( - resolvedIdentifier.identifier.namespace().map(_.toLowerCase(Locale.ROOT)), - resolvedIdentifier.identifier.name().toLowerCase(Locale.ROOT)) - } - val varDef = VariableDefinition(normalizedIdentifier, defaultExpr.originalSQL, initValue) + resolvedIdentifiers.foreach(resolvedIdentifier => { + val normalizedIdentifier = if (session.sessionState.conf.caseSensitiveAnalysis) { + resolvedIdentifier.identifier + } else { + Identifier.of( + resolvedIdentifier.identifier.namespace().map(_.toLowerCase(Locale.ROOT)), + resolvedIdentifier.identifier.name().toLowerCase(Locale.ROOT)) + } + val varDef = VariableDefinition(normalizedIdentifier, defaultExpr.originalSQL, initValue) - // create local variable if we are in a script, otherwise create session variable - scriptingVariableManager - .filter(_ => resolvedIdentifier.catalog == FakeLocalCatalog) - // If resolvedIdentifier.catalog is FakeLocalCatalog, scriptingVariableManager - // will always be present. - .getOrElse(tempVariableManager) - .create( - normalizedIdentifier.namespace().toSeq :+ normalizedIdentifier.name(), - varDef, - replace) + // create local variable if we are in a script, otherwise create session variable + scriptingVariableManager + .filter(_ => resolvedIdentifier.catalog == FakeLocalCatalog) + // If resolvedIdentifier.catalog is FakeLocalCatalog, scriptingVariableManager + // will always be present. + .getOrElse(tempVariableManager) + .create( + normalizedIdentifier.namespace().toSeq :+ normalizedIdentifier.name(), + varDef, + replace) + }) Review Comment: @srielau Do we want to make this new multivariable `DECLARE` atomic, so if one variable declaration fails, should they all fail? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org