dbtsai edited a comment on issue #27066: [SPARK-31317][SQL] Add withField 
method to Column class
URL: https://github.com/apache/spark/pull/27066#issuecomment-609512144
 
 
   cc @dongjoon-hyun
   
   How about we implement `withField` by constructing a new struct using 
existing expressions? 
   
   Let's say we have a query such as,
   ```scala
       val df: DataFrame = spark.createDataFrame(sparkContext.parallelize(
         Row(Row(1, 2, 3)) :: Row(Row(2, 4, 5)) :: Nil),
         StructType(Seq(StructField("a", structLevel1Schema))))
       val df1 = df.withColumn("new1", df("a").withField("abc", df("a.b")))
       val df2 = df1.withColumn("new2", df1("new1").withField("efg", 
df1("new1.c")))
       val df3 = df2.filter(df2("new2.efg") > 0)
   ```
   
   The execution plan with this PR is
   ```scala
   == Physical Plan ==
   *(1) Project [a#1, add_fields(a#1, abc, a#1.b) AS new1#8, 
add_fields(add_fields(a#1, abc, a#1.b), efg, add_fields(a#1, abc, a#1.b).c) AS 
new2#17]
   +- *(1) Filter (add_fields(add_fields(a#1, abc, a#1.b), efg, add_fields(a#1, 
abc, a#1.b).c).efg > 0)
      +- *(1) Scan ExistingRDD[a#1]
   ```
   
   As the plan shown, effectively, we have to construct the complex data 
structure twice; one for final projection, and one for filter. Moreover, 
because the filter is on complex data structure, the predicate is not able to 
pushdown to data sources.
   
   Instead of adding a new expression, we can build the same functionality by 
creating a new struct which contains the references to the alias of variables. 
Then we are able to create a simpler execution plan, and this plan will be able 
to support predicate pushdown.
   
   With this idea, `withField` can be implemented by,
   ```scala
     def withField(fieldName: String, fieldValue: Column): Column = {
       expr.dataType match {
         case StructType(fields) =>
           val newCols: Array[Column] = {
             if (fields.exists(_.name == fieldName)) {
               fields.map { field =>
                 if (field.name == fieldName) {
                   fieldValue as field.name
                 } else {
                   getField(field.name) as field.name
                 }
               }
             } else {
               fields.map {
                 field => getField(field.name) as field.name
               } :+ (fieldValue as fieldName)
             }
           }
           org.apache.spark.sql.functions.struct(newCols: _*)
         case _ =>
           throw new IllegalArgumentException("Some Error")
       }
     }
   ```
   The same query returns 
   ```scala
   == Physical Plan ==
   *(1) Project [a#1, named_struct(a, a#1.a, b, a#1.b, c, a#1.c, abc, a#1.b) AS 
new1#8, named_struct(a, a#1.a, b, a#1.b, c, a#1.c, abc, a#1.b, efg, a#1.c) AS 
new2#17]
   +- *(1) Filter (isnotnull(a#1) AND (a#1.c > 0))
      +- *(1) Scan ExistingRDD[a#1]
   ```
   
   Thus, we don't need to maintain complex codegen path which is often 
error-prone, and existing optimizations such as predicate pushdown and schema 
pruning can be applied as well.
   
   Thanks,
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to