hvanhovell commented on code in PR #47839:
URL: https://github.com/apache/spark/pull/47839#discussion_r1726149232
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/columnNodeSupport.scala:
##########
@@ -89,58 +103,65 @@ object ColumnNodeToProtoConverter extends (ColumnNode =>
proto.Expression) {
builder.getExpressionStringBuilder.setExpression(expression)
case s: SortOrder =>
- builder.setSortOrder(convertSortOrder(s))
+ builder.setSortOrder(convertSortOrder(s, e))
case Window(windowFunction, windowSpec, _) =>
val b = builder.getWindowBuilder
- .setWindowFunction(apply(windowFunction))
- .addAllPartitionSpec(windowSpec.partitionColumns.map(apply).asJava)
- .addAllOrderSpec(windowSpec.sortColumns.map(convertSortOrder).asJava)
+ .setWindowFunction(apply(windowFunction, e))
+ .addAllPartitionSpec(windowSpec.partitionColumns.map(apply(_,
e)).asJava)
+ .addAllOrderSpec(windowSpec.sortColumns.map(convertSortOrder(_,
e)).asJava)
windowSpec.frame.foreach { frame =>
b.getFrameSpecBuilder
.setFrameType(frame.frameType match {
case WindowFrame.Row => FrameType.FRAME_TYPE_ROW
case WindowFrame.Range => FrameType.FRAME_TYPE_RANGE
})
- .setLower(convertFrameBoundary(frame.lower))
- .setUpper(convertFrameBoundary(frame.upper))
+ .setLower(convertFrameBoundary(frame.lower, e))
+ .setUpper(convertFrameBoundary(frame.upper, e))
}
case UnresolvedExtractValue(child, extraction, _) =>
builder.getUnresolvedExtractValueBuilder
- .setChild(apply(child))
- .setExtraction(apply(extraction))
+ .setChild(apply(child, e))
+ .setExtraction(apply(extraction, e))
case UpdateFields(structExpression, fieldName, valueExpression, _) =>
val b = builder.getUpdateFieldsBuilder
- .setStructExpression(apply(structExpression))
+ .setStructExpression(apply(structExpression, e))
.setFieldName(fieldName)
- valueExpression.foreach(v => b.setValueExpression(apply(v)))
+ valueExpression.foreach(v => b.setValueExpression(apply(v, e)))
case v: UnresolvedNamedLambdaVariable =>
builder.setUnresolvedNamedLambdaVariable(convertNamedLambdaVariable(v))
case LambdaFunction(function, arguments, _) =>
builder.getLambdaFunctionBuilder
- .setFunction(apply(function))
+ .setFunction(apply(function, e))
.addAllArguments(arguments.map(convertNamedLambdaVariable).asJava)
- case InvokeInlineUserDefinedFunction(udf: ScalaUserDefinedFunction,
arguments, false, _) =>
- val b = builder.getCommonInlineUserDefinedFunctionBuilder
- .setScalarScalaUdf(udf.udf)
- .setDeterministic(udf.deterministic)
- .addAllArguments(arguments.map(apply).asJava)
- udf.givenName.foreach(b.setFunctionName)
+ case InvokeInlineUserDefinedFunction(
+ a: Aggregator[Any @unchecked, Any @unchecked, Any @unchecked],
+ Nil,
+ false,
+ _) =>
+ // TODO we should probably 'just' detect this particular scenario
+ // in the planner instead of wrapping it in a separate method.
+ val protoUdf = UdfToProtoUtils.toProto(UserDefinedAggregator(a, e.get))
+
builder.getTypedAggregateExpressionBuilder.setScalarScalaUdf(protoUdf.getScalarScalaUdf)
+
+ case InvokeInlineUserDefinedFunction(udf: UserDefinedFunction, args,
false, _) =>
+ builder.setCommonInlineUserDefinedFunction(
+ UdfToProtoUtils.toProto(udf, args.map(apply(_, e))))
Review Comment:
Please note that we are serializing the UDF here. This is much later than
before (when it is being used in the plan, instead of when it is being
defined). The reason for doing it earlier was to mostly avoid capturing
unwanted classes in the REPL. We had test for this, and those are passing. That
means that the closure cleaner work had paid off.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]