dbtsai commented on a change in pull request #27066:
URL: https://github.com/apache/spark/pull/27066#discussion_r411874270



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head
+  private lazy val ogStructType = 
ogStructExpr.dataType.asInstanceOf[StructType]
+  private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+    case Seq(name, value) => (name, value)
+  }.toList.unzip
+  private lazy val names = nameExprs.map(e => 
Option(e.eval()).map(_.toString).orNull)

Review comment:
       Why not just `  lazy val names = nameExprs.map(_.eval(EmptyRow))`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head

Review comment:
       What does `og` stand for?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head
+  private lazy val ogStructType = 
ogStructExpr.dataType.asInstanceOf[StructType]
+  private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+    case Seq(name, value) => (name, value)
+  }.toList.unzip
+  private lazy val names = nameExprs.map(e => 
Option(e.eval()).map(_.toString).orNull)
+  private lazy val addOrReplaceExprs = names.zip(valExprs)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val expectedStructType = StructType(Nil).typeName
+    if (children.size % 2 == 0) {
+      TypeCheckResult.TypeCheckFailure(s"$prettyName expects an odd number of 
arguments.")
+    } else if (ogStructExpr.dataType.typeName != expectedStructType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only $expectedStructType is allowed to appear at first position, 
got: " +
+          s"${ogStructExpr.dataType.typeName}.")
+    } else if (names.contains(null)) {
+      TypeCheckResult.TypeCheckFailure("Field name should not be null.")
+    } else if (nameExprs.exists(e => !(e.foldable && e.dataType == 
StringType))) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only foldable ${StringType.catalogString} expressions are allowed to 
appear at even " +
+          "position.")
+    } else {
+      TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def dataType: StructType = {
+    val ogStructFields = ogStructType.fields.map { f =>
+      (f.name, f.copy(nullable = ogStructExpr.nullable || f.nullable))
+    }
+    val addOrReplaceStructFields = addOrReplaceExprs.map { case (name, expr) =>
+      (name, StructField(name, expr.dataType, expr.nullable))
+    }
+    val newStructFields = addOrReplace(ogStructFields, 
addOrReplaceStructFields).map(_._2)
+    StructType(newStructFields)

Review comment:
       Can you add type of `ogStructFields` and `addOrReplaceStructFields` by 
`Seq[(String, StructField)]`? It's easier to read the code.
   
   Since newStructFields is used once, let's do
   `StructType(addOrReplace(ogStructFields, 
addOrReplaceStructFields).map(_._2))`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head
+  private lazy val ogStructType = 
ogStructExpr.dataType.asInstanceOf[StructType]
+  private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+    case Seq(name, value) => (name, value)
+  }.toList.unzip
+  private lazy val names = nameExprs.map(e => 
Option(e.eval()).map(_.toString).orNull)
+  private lazy val addOrReplaceExprs = names.zip(valExprs)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val expectedStructType = StructType(Nil).typeName
+    if (children.size % 2 == 0) {
+      TypeCheckResult.TypeCheckFailure(s"$prettyName expects an odd number of 
arguments.")
+    } else if (ogStructExpr.dataType.typeName != expectedStructType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only $expectedStructType is allowed to appear at first position, 
got: " +
+          s"${ogStructExpr.dataType.typeName}.")
+    } else if (names.contains(null)) {
+      TypeCheckResult.TypeCheckFailure("Field name should not be null.")
+    } else if (nameExprs.exists(e => !(e.foldable && e.dataType == 
StringType))) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only foldable ${StringType.catalogString} expressions are allowed to 
appear at even " +
+          "position.")
+    } else {
+      TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def dataType: StructType = {
+    val ogStructFields = ogStructType.fields.map { f =>
+      (f.name, f.copy(nullable = ogStructExpr.nullable || f.nullable))
+    }

Review comment:
       @cloud-fan can you confirm that the behavior is okay? If a struct is 
nullable, and the elements in struct are not nullable, after withFields, all 
the elements in the struct will be nullable.  

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {

Review comment:
       Should we call it `WithFields` to make it consistent with Scala API? 
This function can replace a field as well, so calling it by `AddFields` might 
be confusing. 

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head
+  private lazy val ogStructType = 
ogStructExpr.dataType.asInstanceOf[StructType]
+  private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+    case Seq(name, value) => (name, value)
+  }.toList.unzip
+  private lazy val names = nameExprs.map(e => 
Option(e.eval()).map(_.toString).orNull)
+  private lazy val addOrReplaceExprs = names.zip(valExprs)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val expectedStructType = StructType(Nil).typeName
+    if (children.size % 2 == 0) {
+      TypeCheckResult.TypeCheckFailure(s"$prettyName expects an odd number of 
arguments.")
+    } else if (ogStructExpr.dataType.typeName != expectedStructType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only $expectedStructType is allowed to appear at first position, 
got: " +
+          s"${ogStructExpr.dataType.typeName}.")
+    } else if (names.contains(null)) {
+      TypeCheckResult.TypeCheckFailure("Field name should not be null.")
+    } else if (nameExprs.exists(e => !(e.foldable && e.dataType == 
StringType))) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only foldable ${StringType.catalogString} expressions are allowed to 
appear at even " +
+          "position.")
+    } else {
+      TypeCheckResult.TypeCheckSuccess
+    }
+  }
+
+  override def dataType: StructType = {
+    val ogStructFields = ogStructType.fields.map { f =>
+      (f.name, f.copy(nullable = ogStructExpr.nullable || f.nullable))
+    }
+    val addOrReplaceStructFields = addOrReplaceExprs.map { case (name, expr) =>
+      (name, StructField(name, expr.dataType, expr.nullable))
+    }
+    val newStructFields = addOrReplace(ogStructFields, 
addOrReplaceStructFields).map(_._2)
+    StructType(newStructFields)
+  }
+
+  override def foldable: Boolean = children.forall(_.foldable)

Review comment:
       Since `nameExprs` is foldable, and we already check it, we can just do 
`override def foldable: Boolean = valExprs.forall(_.foldable)`.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -514,3 +514,98 @@ case class StringToMap(text: Expression, pairDelim: 
Expression, keyValueDelim: E
 
   override def prettyName: String = "str_to_map"
 }
+
+/**
+ * Adds/replaces fields in struct by name.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces 
fields in struct by name.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+       {"a":1,"b":2,"c":3}
+      > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+       {"a":1,"b":3}
+      > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+       {"a":null,"b":null,"c":3}
+      > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1, 
'b', 2, 'b', 3)) AS T(a));
+       {"a":1,"b":100,"b":100}
+      > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES 
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+       {"a":{"a":1,"b":2,"c":3}}
+  """)
+// scalastyle:on line.size.limit
+case class AddFields(children: Seq[Expression]) extends Unevaluable {
+
+  private lazy val ogStructExpr = children.head
+  private lazy val ogStructType = 
ogStructExpr.dataType.asInstanceOf[StructType]
+  private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+    case Seq(name, value) => (name, value)
+  }.toList.unzip
+  private lazy val names = nameExprs.map(e => 
Option(e.eval()).map(_.toString).orNull)
+  private lazy val addOrReplaceExprs = names.zip(valExprs)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val expectedStructType = StructType(Nil).typeName
+    if (children.size % 2 == 0) {
+      TypeCheckResult.TypeCheckFailure(s"$prettyName expects an odd number of 
arguments.")
+    } else if (ogStructExpr.dataType.typeName != expectedStructType) {
+      TypeCheckResult.TypeCheckFailure(
+        s"Only $expectedStructType is allowed to appear at first position, 
got: " +
+          s"${ogStructExpr.dataType.typeName}.")
+    } else if (names.contains(null)) {
+      TypeCheckResult.TypeCheckFailure("Field name should not be null.")
+    } else if (nameExprs.exists(e => !(e.foldable && e.dataType == 
StringType))) {

Review comment:
       In `CreateNamedStruct`, we have the same check. Can we create an utility 
function, and share the code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to