cloud-fan commented on code in PR #48908:
URL: https://github.com/apache/spark/pull/48908#discussion_r1890188655


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +493,53 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
+  @transient
+  private lazy val evaluator: JsonTupleEvaluator = 
JsonTupleEvaluator(foldableFieldNames)
+
   override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
-    if (json == null) {
-      return nullRow
-    }
-
-    try {
-      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having 
Jackson
-      detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) 
{ parser =>
-        parseRow(parser, input)
-      }
-    } catch {
-      case _: JsonProcessingException =>
-        nullRow
-    }
-  }
-
-  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
-    // only objects are supported
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      return nullRow
-    }
-
-    // evaluate the field names as String rather than UTF8String to
-    // optimize lookups from the json token, which is also a String
-    val fieldNames = if (constantFields == fieldExpressions.length) {
-      // typically the user will provide the field names as foldable 
expressions
-      // so we can use the cached copy
-      foldableFieldNames.map(_.orNull)
-    } else if (constantFields == 0) {
-      // none are foldable so all field names need to be evaluated from the 
input row
-      fieldExpressions.map { expr =>
-        
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-      }
-    } else {
-      // if there is a mix of constant and non-constant expressions
-      // prefer the cached copy when available
-      foldableFieldNames.zip(fieldExpressions).map {
-        case (null, expr) =>
-          
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-        case (fieldName, _) => fieldName.orNull
-      }
-    }
-
-    val row = Array.ofDim[Any](fieldNames.length)
-
-    // start reading through the token stream, looking for any requested field 
names
-    while (parser.nextToken() != JsonToken.END_OBJECT) {
-      if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
-        // check to see if this field is desired in the output
-        val jsonField = parser.currentName
-        var idx = fieldNames.indexOf(jsonField)
-        if (idx >= 0) {
-          // it is, copy the child tree to the correct location in the output 
row
-          val output = new ByteArrayOutputStream()
-
-          // write the output directly to UTF8 encoded byte array
-          if (parser.nextToken() != JsonToken.VALUE_NULL) {
-            Utils.tryWithResource(jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)) {
-              generator => copyCurrentStructure(generator, parser)
-            }
-
-            val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
-            // SPARK-21804: json_tuple returns null values within repeated 
columns
-            // except the first one; so that we need to check the remaining 
fields.
-            do {
-              row(idx) = jsonValue
-              idx = fieldNames.indexOf(jsonField, idx + 1)
-            } while (idx >= 0)
-          }
-        }
-      }
-
-      // always skip children, it's cheap enough to do even if 
copyCurrentStructure was called
-      parser.skipChildren()
-    }
-
-    new GenericInternalRow(row) :: Nil
+    val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String])
+    evaluator.evaluate(json, fields)
   }
 
-  private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {
-    parser.getCurrentToken match {
-      // if the user requests a string field it needs to be returned without 
enclosing
-      // quotes which is accomplished via JsonGenerator.writeRaw instead of 
JsonGenerator.write
-      case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
-        // slight optimization to avoid allocating a String instance, though 
the characters
-        // still have to be decoded... Jackson doesn't have a way to access 
the raw bytes
-        generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, 
parser.getTextLength)
-
-      case JsonToken.VALUE_STRING =>
-        // the normal String case, pass it through to the output without 
enclosing quotes
-        generator.writeRaw(parser.getText)
-
-      case JsonToken.VALUE_NULL =>
-        // a special case that needs to be handled outside of this method.
-        // if a requested field is null, the result must be null. the easiest
-        // way to achieve this is just by ignoring null tokens entirely
-        throw SparkException.internalError("Do not attempt to copy a null 
field.")
-
-      case _ =>
-        // handle other types including objects, arrays, booleans and numbers
-        generator.copyCurrentStructure(parser)
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+    val jsonTerm = ctx.freshName("json")
+    val jsonEval = jsonExpr.genCode(ctx)
+    val fieldsTerm = ctx.freshName("fields")
+    val fieldsEval = fieldExpressions.map(_.genCode(ctx))
+    val arraySeqClass = classOf[ArraySeq[_]].getName
+    val wrapperClass = classOf[IterableOnce[_]].getName
+    val setJson =
+      s"""
+         |if (${jsonEval.isNull}) {
+         |  $jsonTerm = null;
+         |} else {
+         |  $jsonTerm = ${jsonEval.value};
+         |}
+         |""".stripMargin
+    val setFields = fieldsEval.zipWithIndex.map {
+      case (fieldEval, idx) =>
+        s"""
+           |if (${fieldEval.isNull}) {
+           |  $fieldsTerm[$idx] = null;
+           |} else {
+           |  $fieldsTerm[$idx] = ${fieldEval.value};
+           |}
+           |""".stripMargin
     }
+    ev.copy(code =
+      code"""
+         |UTF8String $jsonTerm = null;
+         |UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length 
- 1}];

Review Comment:
   shall we use array in the interpreted version as well? And 
`JsonTupleEvaluator` should take array instead of Seq.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to