aokolnychyi commented on code in PR #47943:
URL: https://github.com/apache/spark/pull/47943#discussion_r1776116015


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2611,6 +2614,66 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * A rule that resolves procedures.
+   */
+  object ResolveProcedures extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_PROCEDURE), ruleId) {
+      case Call(UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)), 
args, execute) =>
+        val procedureCatalog = catalog.asProcedureCatalog
+        val procedure = load(procedureCatalog, ident)
+        Call(ResolvedProcedure(procedureCatalog, ident, procedure), args, 
execute)
+    }
+
+    private def load(catalog: ProcedureCatalog, ident: Identifier): 
UnboundProcedure = {
+      try {
+        catalog.loadProcedure(ident)
+      } catch {
+        case e: Exception if !e.isInstanceOf[SparkThrowable] =>
+          val nameParts = catalog.name +: ident.asMultipartIdentifier
+          throw QueryCompilationErrors.failedToLoadRoutineError(nameParts, e)
+      }
+    }
+  }
+
+  /**
+   * A rule that binds procedures to the input types and rearranges arguments 
as needed.
+   */
+  object BindProcedures extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case Call(ResolvedProcedure(catalog, ident, unbound: UnboundProcedure), 
args, execute)
+          if args.forall(_.resolved) =>
+        val inputType = extractInputType(args)
+        val bound = unbound.bind(inputType)
+        validateParameterModes(bound)
+        val rearrangedArgs = NamedParametersSupport.defaultRearrange(bound, 
args)
+        Call(ResolvedProcedure(catalog, ident, bound), rearrangedArgs, execute)
+    }
+
+    private def extractInputType(args: Seq[Expression]): StructType = {
+      val fields = args.zipWithIndex.map {
+        case (NamedArgumentExpression(name, value), _) =>
+          StructField(name, value.dataType, value.nullable, byNameMetadata)
+        case (arg, index) =>
+          StructField(s"param$index", arg.dataType, arg.nullable)
+      }
+      StructType(fields)
+    }
+
+    private def byNameMetadata: Metadata = {
+      new MetadataBuilder()
+        .putBoolean(ProcedureParameter.BY_NAME_METADATA_KEY, value = true)
+        .build()
+    }
+
+   private def validateParameterModes(procedure: BoundProcedure): Unit = {
+     procedure.parameters.find(_.mode != ProcedureParameter.Mode.IN).foreach { 
param =>

Review Comment:
   In the future, yes. There is no active work at the moment, as far as I know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to