zhenlineo commented on code in PR #41704:
URL: https://github.com/apache/spark/pull/41704#discussion_r1244127298


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -806,35 +807,56 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
    */
   private case class TypedScalaUdf(
       function: AnyRef,
-      outEnc: ExpressionEncoder[_],
-      outputObjAttr: Attribute,
-      inEnc: ExpressionEncoder[_],
-      inputObjAttr: Attribute) {
-    val outputNamedExpression: Seq[NamedExpression] = outEnc.namedExpressions
-    def inputDeserializer(inputAttributes: Seq[Attribute] = Nil): Expression = 
{
+      funcOutEnc: AgnosticEncoder[_],
+      funcInEnc: AgnosticEncoder[_],
+      inputAttrs: Option[Seq[Attribute]]) {
+    import TypedScalaUdf.encoderFor
+    def outputNamedExpression: Seq[NamedExpression] = outEnc.namedExpressions
+    def inputDeserializer(inputAttributes: Seq[Attribute] = Nil): Expression =
       UnresolvedDeserializer(inEnc.deserializer, inputAttributes)
-    }
+
+    def outEnc: ExpressionEncoder[_] = encoderFor(funcOutEnc, "output")
+    def outputObjAttr: Attribute = generateObjAttr(outEnc)
+    def inEnc: ExpressionEncoder[_] = encoderFor(funcInEnc, "input", 
inputAttrs)
+    def inputObjAttr: Attribute = generateObjAttr(inEnc)
   }
+
   private object TypedScalaUdf {
-    def apply(expr: proto.Expression): TypedScalaUdf = {
+    def apply(expr: proto.Expression, inputAttrs: Option[Seq[Attribute]]): 
TypedScalaUdf = {
       if (expr.hasCommonInlineUserDefinedFunction
         && expr.getCommonInlineUserDefinedFunction.hasScalarScalaUdf) {
-        apply(expr.getCommonInlineUserDefinedFunction)
+        apply(expr.getCommonInlineUserDefinedFunction, inputAttrs)
       } else {
         throw InvalidPlanInput(s"Expecting a Scala UDF, but get 
${expr.getExprTypeCase}")
       }
     }
 
-    def apply(commonUdf: proto.CommonInlineUserDefinedFunction): TypedScalaUdf 
= {
+    def apply(
+        commonUdf: proto.CommonInlineUserDefinedFunction,
+        inputAttrs: Option[Seq[Attribute]] = None): TypedScalaUdf = {
       val udf = unpackUdf(commonUdf)
-      val outEnc = ExpressionEncoder(udf.outputEncoder)
       // There might be more than one inputs, but we only interested in the 
first one.
       // Most typed API takes one UDF input.
       // For the few that takes more than one inputs, e.g. grouping function 
mapping UDFs,
       // the first input which is the key of the grouping function.
       assert(udf.inputEncoders.nonEmpty)
-      val inEnc = ExpressionEncoder(udf.inputEncoders.head) // single input 
encoder or key encoder
-      TypedScalaUdf(udf.function, outEnc, generateObjAttr(outEnc), inEnc, 
generateObjAttr(inEnc))
+      val inEnc = udf.inputEncoders.head // single input encoder or key encoder
+      TypedScalaUdf(udf.function, udf.outputEncoder, inEnc, inputAttrs)
+    }
+
+    def encoderFor(
+        encoder: AgnosticEncoder[_],
+        errorType: String,
+        inputAttrs: Option[Seq[Attribute]] = None): ExpressionEncoder[_] = {
+      // First try to find the encoder with reflection, if failed
+      // fall back to a row encoder with input schema when the schema is 
provided.
+      Try(ExpressionEncoder(encoder)).getOrElse(

Review Comment:
   Changed this to handle UnboundRowEncoder directly. But beside this typed API 
UDFs, the other two places: UDF and Streaming ForEachWriter can use try-error 
to handle nested unbound row encoders. (We should add tests for these once we 
have the joinwith impl.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to