fqaiser94 commented on a change in pull request #27066:
URL: https://github.com/apache/spark/pull/27066#discussion_r426107029
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
##########
@@ -515,3 +515,100 @@ case class StringToMap(text: Expression, pairDelim:
Expression, keyValueDelim: E
override def prettyName: String = "str_to_map"
}
+
+/**
+ * Adds/replaces fields in struct by name in given order.
+ *
+ * @param children Seq(struct, name1, val1, name2, val2, ...)
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = "_FUNC_(struct, name1, val1, name2, val2, ...) - Adds/replaces
fields in struct by name in given order.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "c", 3);
+ {"a":1,"b":2,"c":3}
+ > SELECT _FUNC_(NAMED_STRUCT("a", 1, "b", 2), "b", 3);
+ {"a":1,"b":3}
+ > SELECT _FUNC_(CAST(NULL AS struct<a:int,b:int>), "c", 3);
+ {"a":null,"b":null,"c":3}
+ > SELECT _FUNC_(a, 'b', 100) AS a FROM (VALUES (NAMED_STRUCT('a', 1,
'b', 2, 'b', 3)) AS T(a));
+ {"a":1,"b":100,"b":100}
+ > SELECT _FUNC_(a, 'a', _FUNC_(a.a, 'c', 3)) AS a FROM (VALUES
(NAMED_STRUCT('a', NAMED_STRUCT('a', 1, 'b', 2))) AS T(a));
+ {"a":{"a":1,"b":2,"c":3}}
+ """)
+// scalastyle:on line.size.limit
+case class WithFields(children: Seq[Expression]) extends Unevaluable {
+
+ private lazy val structExpr = children.head
+ private lazy val structType = structExpr.dataType.asInstanceOf[StructType]
+ private lazy val (nameExprs, valExprs) = children.drop(1).grouped(2).map {
+ case Seq(name, value) => (name, value)
+ }.toList.unzip
+ private lazy val names = nameExprs.map(e =>
Option(e.eval()).map(_.toString).orNull)
+ private lazy val addOrReplaceExprs = names.zip(valExprs)
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val expectedStructType = StructType(Nil).typeName
+ if (children.size % 2 == 0) {
+ TypeCheckResult.TypeCheckFailure(s"$prettyName expects an odd number of
arguments.")
+ } else if (structExpr.dataType.typeName != expectedStructType) {
+ TypeCheckResult.TypeCheckFailure(
+ s"Only $expectedStructType is allowed to appear at first position,
got: " +
+ s"${structExpr.dataType.typeName}.")
+ } else if (names.contains(null)) {
+ TypeCheckResult.TypeCheckFailure("Field name should not be null.")
+ } else if (nameExprs.exists(e => !(e.foldable && e.dataType ==
StringType))) {
+ TypeCheckResult.TypeCheckFailure(
+ s"Only foldable ${StringType.catalogString} expressions are allowed to
appear at even " +
+ "position.")
+ } else {
+ TypeCheckResult.TypeCheckSuccess
+ }
+ }
+
+ override def dataType: StructType = {
+ val existingStructFields: Seq[(String, StructField)] =
structType.fields.map { f =>
+ (f.name, f.copy(nullable = structExpr.nullable || f.nullable))
+ }
+ val addOrReplaceStructFields: Seq[(String, StructField)] =
addOrReplaceExprs.map {
+ case (name, expr) => (name, StructField(name, expr.dataType,
expr.nullable))
+ }
+ StructType(addOrReplace(existingStructFields,
addOrReplaceStructFields).map(_._2))
+ }
+
+ override def foldable: Boolean = children.forall(_.foldable)
+
+ override def nullable: Boolean = false
Review comment:
For simplicity of implementation and easy optimization, we decided that
in case of a `null` StructType column, we would return a struct populated with
`null` values for the existing fields. For example:
```
val df = spark.createDataFrame(sparkContext.parallelize(
Row(Row(1, 2, 3)) :: Row(null) :: Nil),
StructType(Seq(StructField("a", StructType(Seq(
StructField("a", IntegerType),
StructField("b", IntegerType),
StructField("c", IntegerType)))))))
df.show(false)
+---------+
|a |
+---------+
|[1, 2, 3]|
|null |
+---------+
df.withColumn("newA", df("a").withField("d", lit(4))).show(false)
+---------+------------+
|a |newA |
+---------+------------+
|[1, 2, 3]|[1, 2, 3, 4]|
|null |[,,, 4] |
+---------+------------+
```
As a result, this expression will never return `null` and thus `nullable`
should always return `false`.
For reference, please see the discussion earlier in the PR:
[comment1](https://github.com/apache/spark/pull/27066#issuecomment-609671222)
[comment2](https://github.com/apache/spark/pull/27066#issuecomment-609979218).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]