fqaiser94 commented on a change in pull request #27066:
URL: https://github.com/apache/spark/pull/27066#discussion_r439103206
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -539,3 +541,90 @@ case class StringToMap(text: Expression, pairDelim:
Expression, keyValueDelim: E
override def prettyName: String = "str_to_map"
}
+
+/**
+ * Adds/replaces field in struct by name.
+ */
+case class WithField(structExpr: Expression, fieldName: String, valueExpr:
Expression)
+ extends Unevaluable {
+
+ private lazy val fieldPath =
CatalystSqlParser.parseMultipartIdentifier(fieldName)
Review comment:
Thanks for explaining this to me. I've changed the code back to the way
you wrote it in your refactor commit.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
##########
@@ -923,4 +923,450 @@ class ColumnExpressionSuite extends QueryTest with
SharedSparkSession {
val inSet = InSet(Literal("a"), Set("a", "b").map(UTF8String.fromString))
assert(inSet.sql === "('a' IN ('a', 'b'))")
}
+
+ def checkAnswerAndSchema(
+ df: => DataFrame,
Review comment:
done
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
##########
@@ -923,4 +923,450 @@ class ColumnExpressionSuite extends QueryTest with
SharedSparkSession {
val inSet = InSet(Literal("a"), Set("a", "b").map(UTF8String.fromString))
assert(inSet.sql === "('a' IN ('a', 'b'))")
}
+
+ def checkAnswerAndSchema(
+ df: => DataFrame,
+ expectedAnswer: Seq[Row],
+ expectedSchema: StructType): Unit = {
+
+ checkAnswer(df, expectedAnswer)
+ assert(df.schema == expectedSchema)
+ }
+
+ private lazy val structType = StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false)))
+
+ private lazy val structLevel1: DataFrame = spark.createDataFrame(
+ sparkContext.parallelize(Row(Row(1, null, 3)) :: Nil),
+ StructType(Seq(StructField("a", structType, nullable = false))))
+
+ private lazy val nullStructLevel1: DataFrame = spark.createDataFrame(
+ sparkContext.parallelize(Row(null) :: Nil),
+ StructType(Seq(StructField("a", structType, nullable = true))))
+
+ private lazy val structLevel2: DataFrame = spark.createDataFrame(
+ sparkContext.parallelize(Row(Row(Row(1, null, 3))) :: Nil),
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", structType, nullable = false))),
+ nullable = false))))
+
+ private lazy val structLevel3: DataFrame = spark.createDataFrame(
+ sparkContext.parallelize(Row(Row(Row(Row(1, null, 3)))) :: Nil),
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", structType, nullable = false))),
+ nullable = false))),
+ nullable = false))))
+
+ test("withField should throw an exception if called on a non-StructType
column") {
+ intercept[AnalysisException] {
+ testData.withColumn("key", $"key".withField("a", lit(2)))
+ }.getMessage should include("struct argument should be struct type, got:
int.")
+ }
+
+ test("withField should throw an exception if fieldName is null") {
+ intercept[AnalysisException] {
+ structLevel1.withColumn("a", $"a".withField(null, lit(2)))
+ }.getMessage should include("fieldName argument should not be null.")
+ }
+
+ test("withField should throw an exception if fieldName is empty") {
+ intercept[AnalysisException] {
+ structLevel1.withColumn("a", $"a".withField("", lit(2)))
+ }.getMessage should include("fieldName argument should not be empty.")
+ }
+
+ test("withField should throw an exception if any intermediate structs don't
exist") {
+ intercept[AnalysisException] {
+ structLevel2.withColumn("a", 'a.withField("x.b", lit(2)))
+ }.getMessage should include("Intermediate field x does not exist.")
+
+ intercept[AnalysisException] {
+ structLevel3.withColumn("a", 'a.withField("a.x.b", lit(2)))
+ }.getMessage should include("Intermediate field a.x does not exist.")
+ }
+
+ test("withField should throw an exception if any intermediate field is not a
struct") {
+ intercept[AnalysisException] {
+ structLevel1.withColumn("a", 'a.withField("b.a", lit(2)))
+ }.getMessage should include("Intermediate field b should be struct type,
got: int.")
+
+ intercept[AnalysisException] {
+ val structLevel2: DataFrame = spark.createDataFrame(
+ sparkContext.parallelize(Row(Row(Row(1, null, 3), 4)) :: Nil),
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", structType, nullable = false),
+ StructField("a", IntegerType, nullable = false))),
+ nullable = false))))
+
+ structLevel2.withColumn("a", 'a.withField("a.b", lit(2)))
+ }.getMessage should include("Intermediate field a should be struct type,
got: int.")
+ }
+
+ test("withField should add field to struct") {
+ checkAnswerAndSchema(
+ structLevel1.withColumn("a", 'a.withField("d", lit(4))),
+ Row(Row(1, null, 3, 4)) :: Nil,
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false),
+ StructField("d", IntegerType, nullable = false))),
+ nullable = false))))
+ }
+
+ test("withField should add field to null struct") {
+ checkAnswerAndSchema(
+ nullStructLevel1.withColumn("a", $"a".withField("d", lit(4))),
+ Row(null) :: Nil,
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = true),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = true),
+ StructField("d", IntegerType, nullable = false))),
+ nullable = true))))
+ }
+
+ test("withField should add null field to struct") {
+ checkAnswerAndSchema(
+ structLevel1.withColumn("a", 'a.withField("d",
lit(null).cast(IntegerType))),
+ Row(Row(1, null, 3, null)) :: Nil,
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false),
+ StructField("d", IntegerType, nullable = true))),
+ nullable = false))))
+ }
+
+ test("withField should add multiple fields to struct") {
+ checkAnswerAndSchema(
+ structLevel1.withColumn("a", 'a.withField("d", lit(4)).withField("e",
lit(5))),
+ Row(Row(1, null, 3, 4, 5)) :: Nil,
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false),
+ StructField("d", IntegerType, nullable = false),
+ StructField("e", IntegerType, nullable = false))),
+ nullable = false))))
+ }
+
+ test("withField should add field to nested struct") {
+ Seq(
+ structLevel2.withColumn("a", 'a.withField("a.d", lit(4))),
+ structLevel2.withColumn("a", 'a.withField("a", $"a.a".withField("d",
lit(4))))
+ ).foreach { df =>
+ checkAnswerAndSchema(
+ df,
+ Row(Row(Row(1, null, 3, 4))) :: Nil,
+ StructType(
+ Seq(StructField("a", StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false),
+ StructField("d", IntegerType, nullable = false))),
+ nullable = false))),
+ nullable = false))))
+ }
+ }
+
+ test("withField should add field to deeply nested struct") {
+ checkAnswerAndSchema(
+ structLevel3.withColumn("a", 'a.withField("a.a.d", lit(4))),
+ Row(Row(Row(Row(1, null, 3, 4)))) :: Nil,
+ StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", StructType(Seq(
+ StructField("a", IntegerType, nullable = false),
+ StructField("b", IntegerType, nullable = true),
+ StructField("c", IntegerType, nullable = false),
+ StructField("d", IntegerType, nullable = false))),
+ nullable = false))),
+ nullable = false))),
+ nullable = false))))
+ }
+
+ test( "withField should replace field in struct") {
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]