cloud-fan commented on code in PR #47943:
URL: https://github.com/apache/spark/pull/47943#discussion_r1764590153


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2611,6 +2614,71 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * A rule that resolves procedures.
+   */
+  object ResolveProcedures extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_PROCEDURE), ruleId) {
+      case Call(UnresolvedProcedure(CatalogAndIdentifier(catalog, ident)), 
args, execute) =>
+        val procedureCatalog = catalog.asProcedureCatalog
+        val procedure = load(procedureCatalog, ident)
+        Call(ResolvedProcedure(procedureCatalog, ident, procedure), args, 
execute)
+    }
+
+    private def load(catalog: ProcedureCatalog, ident: Identifier): 
UnboundProcedure = {
+      try {
+        catalog.loadProcedure(ident)
+      } catch {
+        case e: Exception if !e.isInstanceOf[SparkThrowable] =>
+          val nameParts = catalog.name +: ident.asMultipartIdentifier
+          throw QueryCompilationErrors.failedToLoadRoutineError(nameParts, e)
+      }
+    }
+  }
+
+  /**
+   * A rule that binds procedures to the input types and rearranges arguments 
as needed.
+   */
+  object BindProcedures extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+      case Call(ResolvedProcedure(catalog, ident, unbound: UnboundProcedure), 
args, execute)
+          if args.forall(_.resolved) =>
+        val inputType = extractInputType(args)
+        val bound = unbound.bind(inputType)
+        validateParameterModes(bound)
+        val rearrangedArgs = NamedParametersSupport.defaultRearrange(bound, 
args)
+        Call(ResolvedProcedure(catalog, ident, bound), rearrangedArgs, execute)
+    }
+
+    private def extractInputType(args: Seq[Expression]): StructType = {
+      val fields = args.zipWithIndex.map { case (arg, index) =>
+        arg match {
+          case NamedArgumentExpression(name, value) =>
+            val metadata = argMetadata(byName = true)
+            StructField(name, value.dataType, value.nullable, metadata)
+          case _ =>
+            val name = s"param$index"
+            val metadata = argMetadata(byName = false)
+            StructField(name, arg.dataType, arg.nullable, metadata)
+        }
+      }
+      StructType(fields)
+    }
+
+    private def argMetadata(byName: Boolean): Metadata = {
+      new MetadataBuilder()
+        .putBoolean(ProcedureParameter.BY_NAME_METADATA_KEY, byName)

Review Comment:
   shall we omit this metadata if the arg is not by name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to