Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22944#discussion_r230977212
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
---
@@ -262,25 +262,39 @@ object AppendColumns {
def apply[T : Encoder, U : Encoder](
func: T => U,
child: LogicalPlan): AppendColumns = {
+ val outputEncoder = encoderFor[U]
+ val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+ assert(outputEncoder.namedExpressions.length == 1)
+ outputEncoder.namedExpressions.map(Alias(_, "key")())
+ } else {
+ outputEncoder.namedExpressions
+ }
new AppendColumns(
func.asInstanceOf[Any => Any],
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
UnresolvedDeserializer(encoderFor[T].deserializer),
- encoderFor[U].namedExpressions,
+ namedExpressions,
child)
}
def apply[T : Encoder, U : Encoder](
func: T => U,
inputAttributes: Seq[Attribute],
child: LogicalPlan): AppendColumns = {
+ val outputEncoder = encoderFor[U]
+ val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+ assert(outputEncoder.namedExpressions.length == 1)
+ outputEncoder.namedExpressions.map(Alias(_, "key")())
+ } else {
+ outputEncoder.namedExpressions
--- End diff --
so we may still fail if `T` and `U` are case classes and have conflict
field names?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]