chenhao-db commented on code in PR #45805:
URL: https://github.com/apache/spark/pull/45805#discussion_r1555298791
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala:
##########
@@ -822,6 +822,8 @@ object FunctionRegistry {
expression[ParseJson]("parse_json"),
expressionBuilder("variant_get", VariantGetExpressionBuilder),
expressionBuilder("try_variant_get", TryVariantGetExpressionBuilder),
+ expression[VariantExplode]("variant_explode"),
+ expressionGeneratorOuter[VariantExplode]("variant_explode_outer"),
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala:
##########
@@ -403,3 +404,85 @@ object VariantGetExpressionBuilder extends
VariantGetExpressionBuilderBase(true)
)
// scalastyle:on line.size.limit
object TryVariantGetExpressionBuilder extends
VariantGetExpressionBuilderBase(false)
+
+// scalastyle:off line.size.limit line.contains.tab
+@ExpressionDescription(
+ usage = "_FUNC_(expr) - It separates a variant object/array into multiple
rows containing its fields/elements. Its result schema is `struct<pos int, key
string, value variant>`. `pos` is the position of the field/element in its
parent object/array, and `value` is the field/element value. `key` is the field
name when exploding a variant object, or is NULL when exploding a variant
array. It ignores any input that is not a variant array/object, including SQL
NULL, variant null, and any other variant values.",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(parse_json('["hello", "world"]'));
+ 0 NULL "hello"
+ 1 NULL "world"
+ > SELECT _FUNC_(parse_json('{"a": true, "b": 3.14}'));
+ 0 a true
+ 1 b 3.14
+ """,
+ since = "4.0.0",
+ group = "variant_funcs")
+// scalastyle:on line.size.limit line.contains.tab
+case class VariantExplode(child: Expression) extends UnaryExpression with
Generator
+ with ExpectsInputTypes {
+ override def inputTypes: Seq[AbstractDataType] = Seq(VariantType)
+
+ override def prettyName: String = "variant_explode"
+
+ override protected def withNewChildInternal(newChild: Expression):
VariantExplode =
+ copy(child = newChild)
+
+ override def eval(input: InternalRow): IterableOnce[InternalRow] = {
+ val inputVariant = child.eval(input).asInstanceOf[VariantVal]
+ VariantExplode.variantExplode(inputVariant, inputVariant == null)
+ }
+
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val childCode = child.genCode(ctx)
+ val code = code"""
+ ${childCode.code}
+ scala.collection.Seq<InternalRow> ${ev.value} =
+
org.apache.spark.sql.catalyst.expressions.variant.VariantExplode.variantExplode(
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]