Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22944#discussion_r230995240
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
    @@ -262,25 +262,39 @@ object AppendColumns {
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    +    }
         new AppendColumns(
           func.asInstanceOf[Any => Any],
           implicitly[Encoder[T]].clsTag.runtimeClass,
           implicitly[Encoder[T]].schema,
           UnresolvedDeserializer(encoderFor[T].deserializer),
    -      encoderFor[U].namedExpressions,
    +      namedExpressions,
           child)
       }
     
       def apply[T : Encoder, U : Encoder](
           func: T => U,
           inputAttributes: Seq[Attribute],
           child: LogicalPlan): AppendColumns = {
    +    val outputEncoder = encoderFor[U]
    +    val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
    +      assert(outputEncoder.namedExpressions.length == 1)
    +      outputEncoder.namedExpressions.map(Alias(_, "key")())
    +    } else {
    +      outputEncoder.namedExpressions
    --- End diff --
    
    I just tried to manually add alias. It seems not working as we expect:
    
    ```scala
    val ds = Seq(Some(("a", 10)), Some(("a", 20)), Some(("b", 1)), Some(("b", 
2)), Some(("c", 1)), None).toDS()
    
    val newDS = ds.select(expr("value").as("opt").as[Option[(String, Int)]])
    
    // schema
    root                                                                        
                                                                           
     |-- value: struct (nullable = true)                                        
                                                                           
     |    |-- _1: string (nullable = true)                                      
                                                                    
     |    |-- _2: integer (nullable = false)
    
    // physical plan
    *(1) SerializeFromObject [if (isnull(unwrapoption(ObjectType(class 
scala.Tuple2), input[0, scala.Option, true]))) null else named_struct(_1, 
staticinvo
    ke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Op
    tion, true]))._1, true, false), _2, 
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, 
scala.Option, true]))._2) AS value#5482]
    +- *(1) MapElements <function1>, obj#5481: scala.Option                     
                        
       +- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#5480: 
scala.Tuple1                                  
          +- *(1) Project [value#5473 AS opt#5476]                              
                                           
             +- LocalTableScan [value#5473]           
    ```
    
    So even we add alias before `groupByKey`, it can't change dataset's output 
names.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to