Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/7946#discussion_r40757511
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
---
@@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path:
Expression)
}
}
}
+
+case class JsonTuple(children: Seq[Expression])
+ extends Expression with CodegenFallback {
+
+ import SharedFactory._
+
+ override def nullable: Boolean = {
+ // a row is always returned
+ false
+ }
+
+ // if processing fails this shared value will be returned
+ @transient private lazy val nullRow: InternalRow =
+ new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+
+ // the json body is the first child
+ @transient private lazy val jsonExpr: Expression = children.head
+
+ // the fields to query are the remaining children
+ @transient private lazy val fieldExpressions: Seq[Expression] =
children.tail
+
+ // eagerly evaluate any foldable the field names
+ @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
+ fieldExpressions.map {
+ case expr if expr.foldable =>
expr.eval().asInstanceOf[UTF8String].toString
+ case _ => null
+ }.toIndexedSeq
+ }
+
+ // and count the number of foldable fields, we'll use this later to
optimize evaluation
+ @transient private lazy val constantFields: Int =
foldableFieldNames.count(_ != null)
+
+ override lazy val dataType: StructType = {
+ val fields = fieldExpressions.zipWithIndex.map {
+ case (_, idx) => StructField(
+ name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
+ dataType = StringType,
+ nullable = true)
+ }
+
+ StructType(fields)
+ }
+
+ override def prettyName: String = "json_tuple"
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ if (children.length < 2) {
+ TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two
arguments")
+ } else if (children.forall(child =>
StringType.acceptsType(child.dataType))) {
+ TypeCheckResult.TypeCheckSuccess
+ } else {
+ TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all
arguments are strings")
+ }
+ }
+
+ override def eval(input: InternalRow): InternalRow = {
+ try {
+ val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
+ if (json == null) {
+ return nullRow
+ }
+
+ val parser = jsonFactory.createParser(json.getBytes)
+
+ // only objects are supported
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ return nullRow
+ }
+
+ // evaluate the field names as String rather than UTF8String to
+ // optimize lookups from the json token, which is also a String
+ val fieldNames = if (constantFields == fieldExpressions.length) {
+ // typically the user will provide the field names as foldable
expressions
+ // so we can use the cached copy
+ foldableFieldNames
+ } else if (constantFields == 0) {
+ // none are foldable so all field names need to be evaluated from
the input row
+
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString)
+ } else {
+ // if there is a mix of constant and non-constant expressions
+ // prefer the cached copy when available
+ foldableFieldNames.zip(fieldExpressions).map {
+ case (null, expr) =>
expr.eval(input).asInstanceOf[UTF8String].toString
+ case (fieldName, _) => fieldName
+ }
+ }
+
+ val row = Array.ofDim[Any](fieldNames.length)
+
+ // start reading through the token stream, looking for any requested
field names
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+ // check to see if this field is desired in the output
+ val idx = fieldNames.indexOf(parser.getCurrentName)
+ if (idx >= 0) {
+ // it is, copy the child tree to the correct location in the
output row
+ val output = new ByteArrayOutputStream()
--- End diff --
ah, I see. We need to create a new stream for every distinct UTF8String.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]