cloud-fan commented on code in PR #48908:
URL: https://github.com/apache/spark/pull/48908#discussion_r1890188655
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +493,53 @@ case class JsonTuple(children: Seq[Expression])
}
}
+ @transient
+ private lazy val evaluator: JsonTupleEvaluator =
JsonTupleEvaluator(foldableFieldNames)
+
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
- if (json == null) {
- return nullRow
- }
-
- try {
- /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having
Jackson
- detect character encoding which could fail for some malformed strings */
- Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json))
{ parser =>
- parseRow(parser, input)
- }
- } catch {
- case _: JsonProcessingException =>
- nullRow
- }
- }
-
- private def parseRow(parser: JsonParser, input: InternalRow):
Seq[InternalRow] = {
- // only objects are supported
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return nullRow
- }
-
- // evaluate the field names as String rather than UTF8String to
- // optimize lookups from the json token, which is also a String
- val fieldNames = if (constantFields == fieldExpressions.length) {
- // typically the user will provide the field names as foldable
expressions
- // so we can use the cached copy
- foldableFieldNames.map(_.orNull)
- } else if (constantFields == 0) {
- // none are foldable so all field names need to be evaluated from the
input row
- fieldExpressions.map { expr =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- }
- } else {
- // if there is a mix of constant and non-constant expressions
- // prefer the cached copy when available
- foldableFieldNames.zip(fieldExpressions).map {
- case (null, expr) =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- case (fieldName, _) => fieldName.orNull
- }
- }
-
- val row = Array.ofDim[Any](fieldNames.length)
-
- // start reading through the token stream, looking for any requested field
names
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
- // check to see if this field is desired in the output
- val jsonField = parser.currentName
- var idx = fieldNames.indexOf(jsonField)
- if (idx >= 0) {
- // it is, copy the child tree to the correct location in the output
row
- val output = new ByteArrayOutputStream()
-
- // write the output directly to UTF8 encoded byte array
- if (parser.nextToken() != JsonToken.VALUE_NULL) {
- Utils.tryWithResource(jsonFactory.createGenerator(output,
JsonEncoding.UTF8)) {
- generator => copyCurrentStructure(generator, parser)
- }
-
- val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
- // SPARK-21804: json_tuple returns null values within repeated
columns
- // except the first one; so that we need to check the remaining
fields.
- do {
- row(idx) = jsonValue
- idx = fieldNames.indexOf(jsonField, idx + 1)
- } while (idx >= 0)
- }
- }
- }
-
- // always skip children, it's cheap enough to do even if
copyCurrentStructure was called
- parser.skipChildren()
- }
-
- new GenericInternalRow(row) :: Nil
+ val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String])
+ evaluator.evaluate(json, fields)
}
- private def copyCurrentStructure(generator: JsonGenerator, parser:
JsonParser): Unit = {
- parser.getCurrentToken match {
- // if the user requests a string field it needs to be returned without
enclosing
- // quotes which is accomplished via JsonGenerator.writeRaw instead of
JsonGenerator.write
- case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
- // slight optimization to avoid allocating a String instance, though
the characters
- // still have to be decoded... Jackson doesn't have a way to access
the raw bytes
- generator.writeRaw(parser.getTextCharacters, parser.getTextOffset,
parser.getTextLength)
-
- case JsonToken.VALUE_STRING =>
- // the normal String case, pass it through to the output without
enclosing quotes
- generator.writeRaw(parser.getText)
-
- case JsonToken.VALUE_NULL =>
- // a special case that needs to be handled outside of this method.
- // if a requested field is null, the result must be null. the easiest
- // way to achieve this is just by ignoring null tokens entirely
- throw SparkException.internalError("Do not attempt to copy a null
field.")
-
- case _ =>
- // handle other types including objects, arrays, booleans and numbers
- generator.copyCurrentStructure(parser)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+ val jsonTerm = ctx.freshName("json")
+ val jsonEval = jsonExpr.genCode(ctx)
+ val fieldsTerm = ctx.freshName("fields")
+ val fieldsEval = fieldExpressions.map(_.genCode(ctx))
+ val arraySeqClass = classOf[ArraySeq[_]].getName
+ val wrapperClass = classOf[IterableOnce[_]].getName
+ val setJson =
+ s"""
+ |if (${jsonEval.isNull}) {
+ | $jsonTerm = null;
+ |} else {
+ | $jsonTerm = ${jsonEval.value};
+ |}
+ |""".stripMargin
+ val setFields = fieldsEval.zipWithIndex.map {
+ case (fieldEval, idx) =>
+ s"""
+ |if (${fieldEval.isNull}) {
+ | $fieldsTerm[$idx] = null;
+ |} else {
+ | $fieldsTerm[$idx] = ${fieldEval.value};
+ |}
+ |""".stripMargin
}
+ ev.copy(code =
+ code"""
+ |UTF8String $jsonTerm = null;
+ |UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length
- 1}];
Review Comment:
shall we use array in the interpreted version as well? And
`JsonTupleEvaluator` should take array instead of Seq.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]