dbtsai commented on issue #27066: [SPARK-31317][SQL] Add withField method to Column class URL: https://github.com/apache/spark/pull/27066#issuecomment-609512144 cc @dongjoon-hyun How about we implement `withField` by constructing a new struct using existing expressions? Let's say we have a query such as, ```scala val df: DataFrame = spark.createDataFrame(sparkContext.parallelize( Row(Row(1, 2, 3)) :: Row(Row(2, 4, 5)) :: Nil), StructType(Seq(StructField("a", structLevel1Schema)))) val df1 = df.withColumn("new1", df("a").withField("abc", df("a.b"))) val df2 = df1.withColumn("new2", df1("new1").withField("efg", df1("new1.c"))) val df3 = df2.filter(df2("new2.efg") > 0) ``` The execution plan with this PR is ```scala == Physical Plan == *(1) Project [a#1, add_fields(a#1, abc, a#1.b) AS new1#8, add_fields(add_fields(a#1, abc, a#1.b), efg, add_fields(a#1, abc, a#1.b).c) AS new2#17] +- *(1) Filter (add_fields(add_fields(a#1, abc, a#1.b), efg, add_fields(a#1, abc, a#1.b).c).efg > 0) +- *(1) Scan ExistingRDD[a#1] ``` As the plan shown, effectively, we have to construct the complex data structure twice; one for final projection, and one for filter. Moreover, because the filter is on complex data structure, the predicate is not able to pushdown to data sources. Instead of adding a new expression, we can build the same functionality by creating a new struct which contains the references to the alias of variables. Then we are able to create a simpler execution plan, and this plan will be able to support predicate pushdown. With this idea, `withField` can be implemented by, ```scala def withField(fieldName: String, fieldValue: Column): Column = { expr.dataType match { case StructType(fields) => val newCols: Array[Column] = { if (fields.exists(_.name == fieldName)) { fields.map { field => if (field.name == fieldName) { fieldValue as field.name } else { getField(field.name) as field.name } } } else { fields.map { field => getField(field.name) as field.name } :+ (fieldValue as fieldName) } } org.apache.spark.sql.functions.struct(newCols: _*) case _ => throw new IllegalArgumentException("Some Error") } } ``` The same query returns ```scala == Physical Plan == *(1) Project [a#1, named_struct(a, a#1.a, b, a#1.b, c, a#1.c, abc, a#1.b) AS new1#8, named_struct(a, a#1.a, b, a#1.b, c, a#1.c, abc, a#1.b, efg, a#1.c) AS new2#17] +- *(1) Filter (isnotnull(a#1) AND (a#1.c > 0)) +- *(1) Scan ExistingRDD[a#1] ``` Thus, we don't need to maintain complex codegen path which is often error-prone, and existing optimizations such as predicate pushdown can be applied as well. Thanks,
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
