Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22944#discussion_r230995240
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
---
@@ -262,25 +262,39 @@ object AppendColumns {
def apply[T : Encoder, U : Encoder](
func: T => U,
child: LogicalPlan): AppendColumns = {
+ val outputEncoder = encoderFor[U]
+ val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+ assert(outputEncoder.namedExpressions.length == 1)
+ outputEncoder.namedExpressions.map(Alias(_, "key")())
+ } else {
+ outputEncoder.namedExpressions
+ }
new AppendColumns(
func.asInstanceOf[Any => Any],
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
UnresolvedDeserializer(encoderFor[T].deserializer),
- encoderFor[U].namedExpressions,
+ namedExpressions,
child)
}
def apply[T : Encoder, U : Encoder](
func: T => U,
inputAttributes: Seq[Attribute],
child: LogicalPlan): AppendColumns = {
+ val outputEncoder = encoderFor[U]
+ val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+ assert(outputEncoder.namedExpressions.length == 1)
+ outputEncoder.namedExpressions.map(Alias(_, "key")())
+ } else {
+ outputEncoder.namedExpressions
--- End diff --
I just tried to manually add alias. It seems not working as we expect:
```scala
val ds = Seq(Some(("a", 10)), Some(("a", 20)), Some(("b", 1)), Some(("b",
2)), Some(("c", 1)), None).toDS()
val newDS = ds.select(expr("value").as("opt").as[Option[(String, Int)]])
// schema
root
|-- value: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: integer (nullable = false)
// physical plan
*(1) SerializeFromObject [if (isnull(unwrapoption(ObjectType(class
scala.Tuple2), input[0, scala.Option, true]))) null else named_struct(_1,
staticinvo
ke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Op
tion, true]))._1, true, false), _2,
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0,
scala.Option, true]))._2) AS value#5482]
+- *(1) MapElements <function1>, obj#5481: scala.Option
+- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#5480:
scala.Tuple1
+- *(1) Project [value#5473 AS opt#5476]
+- LocalTableScan [value#5473]
```
So even we add alias before `groupByKey`, it can't change dataset's output
names.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]