panbingkun commented on code in PR #48908:
URL: https://github.com/apache/spark/pull/48908#discussion_r1889780224


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +496,119 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
+  @transient
+  private lazy val evaluator: JsonTupleEvaluator = 
JsonTupleEvaluator(fieldExpressions.length)
+
   override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
-    if (json == null) {
-      return nullRow
-    }
 
-    try {
-      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having 
Jackson
-      detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) 
{ parser =>
-        parseRow(parser, input)
-      }
-    } catch {
-      case _: JsonProcessingException =>
-        nullRow
-    }
-  }
-
-  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
-    // only objects are supported
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      return nullRow
-    }
-
-    // evaluate the field names as String rather than UTF8String to
-    // optimize lookups from the json token, which is also a String
+    // Evaluate the field names as String rather than UTF8String to
+    // optimize lookups from the json token, which is also a String.
     val fieldNames = if (constantFields == fieldExpressions.length) {

Review Comment:
   Okay, let me update it.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +496,119 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
+  @transient
+  private lazy val evaluator: JsonTupleEvaluator = 
JsonTupleEvaluator(fieldExpressions.length)
+
   override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
-    if (json == null) {
-      return nullRow
-    }
 
-    try {
-      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having 
Jackson
-      detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) 
{ parser =>
-        parseRow(parser, input)
-      }
-    } catch {
-      case _: JsonProcessingException =>
-        nullRow
-    }
-  }
-
-  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
-    // only objects are supported
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      return nullRow
-    }
-
-    // evaluate the field names as String rather than UTF8String to
-    // optimize lookups from the json token, which is also a String
+    // Evaluate the field names as String rather than UTF8String to
+    // optimize lookups from the json token, which is also a String.
     val fieldNames = if (constantFields == fieldExpressions.length) {

Review Comment:
   Updated, the logic looks much simpler, thank you!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String, 
String]) {
     UTF8String.fromString(dt.sql)
   }
 }
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {

Review Comment:
   Updated.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String, 
String]) {
     UTF8String.fromString(dt.sql)
   }
 }
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
+
+  import SharedFactory._
+
+  // If processing fails this shared value will be returned.
+  @transient private lazy val nullRow: Seq[InternalRow] =
+    new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil
+
+  // And count the number of foldable fields, we'll use this later to optimize 
evaluation.
+  @transient private lazy val constantFields: Int = foldableFields.count(_ != 
null)
+
+  private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = {

Review Comment:
   Updated.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +493,53 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
+  @transient
+  private lazy val evaluator: JsonTupleEvaluator = 
JsonTupleEvaluator(foldableFieldNames)
+
   override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
-    if (json == null) {
-      return nullRow
-    }
-
-    try {
-      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having 
Jackson
-      detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) 
{ parser =>
-        parseRow(parser, input)
-      }
-    } catch {
-      case _: JsonProcessingException =>
-        nullRow
-    }
-  }
-
-  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
-    // only objects are supported
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      return nullRow
-    }
-
-    // evaluate the field names as String rather than UTF8String to
-    // optimize lookups from the json token, which is also a String
-    val fieldNames = if (constantFields == fieldExpressions.length) {
-      // typically the user will provide the field names as foldable 
expressions
-      // so we can use the cached copy
-      foldableFieldNames.map(_.orNull)
-    } else if (constantFields == 0) {
-      // none are foldable so all field names need to be evaluated from the 
input row
-      fieldExpressions.map { expr =>
-        
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-      }
-    } else {
-      // if there is a mix of constant and non-constant expressions
-      // prefer the cached copy when available
-      foldableFieldNames.zip(fieldExpressions).map {
-        case (null, expr) =>
-          
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-        case (fieldName, _) => fieldName.orNull
-      }
-    }
-
-    val row = Array.ofDim[Any](fieldNames.length)
-
-    // start reading through the token stream, looking for any requested field 
names
-    while (parser.nextToken() != JsonToken.END_OBJECT) {
-      if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
-        // check to see if this field is desired in the output
-        val jsonField = parser.currentName
-        var idx = fieldNames.indexOf(jsonField)
-        if (idx >= 0) {
-          // it is, copy the child tree to the correct location in the output 
row
-          val output = new ByteArrayOutputStream()
-
-          // write the output directly to UTF8 encoded byte array
-          if (parser.nextToken() != JsonToken.VALUE_NULL) {
-            Utils.tryWithResource(jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)) {
-              generator => copyCurrentStructure(generator, parser)
-            }
-
-            val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
-            // SPARK-21804: json_tuple returns null values within repeated 
columns
-            // except the first one; so that we need to check the remaining 
fields.
-            do {
-              row(idx) = jsonValue
-              idx = fieldNames.indexOf(jsonField, idx + 1)
-            } while (idx >= 0)
-          }
-        }
-      }
-
-      // always skip children, it's cheap enough to do even if 
copyCurrentStructure was called
-      parser.skipChildren()
-    }
-
-    new GenericInternalRow(row) :: Nil
+    val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String])
+    evaluator.evaluate(json, fields)
   }
 
-  private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {
-    parser.getCurrentToken match {
-      // if the user requests a string field it needs to be returned without 
enclosing
-      // quotes which is accomplished via JsonGenerator.writeRaw instead of 
JsonGenerator.write
-      case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
-        // slight optimization to avoid allocating a String instance, though 
the characters
-        // still have to be decoded... Jackson doesn't have a way to access 
the raw bytes
-        generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, 
parser.getTextLength)
-
-      case JsonToken.VALUE_STRING =>
-        // the normal String case, pass it through to the output without 
enclosing quotes
-        generator.writeRaw(parser.getText)
-
-      case JsonToken.VALUE_NULL =>
-        // a special case that needs to be handled outside of this method.
-        // if a requested field is null, the result must be null. the easiest
-        // way to achieve this is just by ignoring null tokens entirely
-        throw SparkException.internalError("Do not attempt to copy a null 
field.")
-
-      case _ =>
-        // handle other types including objects, arrays, booleans and numbers
-        generator.copyCurrentStructure(parser)
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+    val jsonTerm = ctx.freshName("json")
+    val jsonEval = jsonExpr.genCode(ctx)
+    val fieldsTerm = ctx.freshName("fields")
+    val fieldsEval = fieldExpressions.map(_.genCode(ctx))
+    val arraySeqClass = classOf[ArraySeq[_]].getName
+    val wrapperClass = classOf[IterableOnce[_]].getName
+    val setJson =
+      s"""
+         |if (${jsonEval.isNull}) {
+         |  $jsonTerm = null;
+         |} else {
+         |  $jsonTerm = ${jsonEval.value};
+         |}
+         |""".stripMargin
+    val setFields = fieldsEval.zipWithIndex.map {
+      case (fieldEval, idx) =>
+        s"""
+           |if (${fieldEval.isNull}) {
+           |  $fieldsTerm[$idx] = null;
+           |} else {
+           |  $fieldsTerm[$idx] = ${fieldEval.value};
+           |}
+           |""".stripMargin
     }
+    ev.copy(code =
+      code"""
+         |UTF8String $jsonTerm = null;
+         |UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length 
- 1}];

Review Comment:
   Yeah.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String, 
String]) {
     UTF8String.fromString(dt.sql)
   }
 }
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
+
+  import SharedFactory._
+
+  // If processing fails this shared value will be returned.
+  @transient private lazy val nullRow: Seq[InternalRow] =
+    new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil
+
+  // And count the number of foldable fields, we'll use this later to optimize 
evaluation.
+  @transient private lazy val constantFields: Int = foldableFields.count(_ != 
null)
+
+  private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = {
+    // Evaluate the field names as String rather than UTF8String to
+    // optimize lookups from the json token, which is also a String.
+    if (constantFields == fields.length) {
+      // Typically the user will provide the field names as foldable 
expressions
+      // so we can use the cached copy.
+      foldableFields.map(_.orNull)
+    } else if (constantFields == 0) {
+      // None are foldable so all field names need to be evaluated from the 
input row.
+      fields.map { f => Option(f.toString).orNull }
+    } else {
+      // If there is a mix of constant and non-constant expressions
+      // prefer the cached copy when available.
+      foldableFields.zip(fields).map {
+        case (null, f) => Option(f.toString).orNull
+        case (fieldName, _) => fieldName.orNull
+      }
+    }
+  }
+
+  private def parseRow(parser: JsonParser, fieldNames: Seq[String]): 
Seq[InternalRow] = {
+    // Only objects are supported.
+    if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow
+
+    val row = Array.ofDim[Any](fieldNames.length)
+
+    // Start reading through the token stream, looking for any requested field 
names.
+    while (parser.nextToken() != JsonToken.END_OBJECT) {
+      if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+        // Check to see if this field is desired in the output.
+        val jsonField = parser.currentName
+        var idx = fieldNames.indexOf(jsonField)
+        if (idx >= 0) {
+          // It is, copy the child tree to the correct location in the output 
row.
+          val output = new ByteArrayOutputStream()
+
+          // Write the output directly to UTF8 encoded byte array.
+          if (parser.nextToken() != JsonToken.VALUE_NULL) {
+            Utils.tryWithResource(jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)) {
+              generator => copyCurrentStructure(generator, parser)
+            }
+
+            val jsonValue = UTF8String.fromBytes(output.toByteArray)
+
+            // SPARK-21804: json_tuple returns null values within repeated 
columns
+            // except the first one; so that we need to check the remaining 
fields.
+            do {
+              row(idx) = jsonValue
+              idx = fieldNames.indexOf(jsonField, idx + 1)
+            } while (idx >= 0)
+          }
+        }
+      }
+
+      // Always skip children, it's cheap enough to do even if 
copyCurrentStructure was called.
+      parser.skipChildren()
+    }
+    new GenericInternalRow(row) :: Nil
+  }
+
+  private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {
+    parser.getCurrentToken match {
+      // If the user requests a string field it needs to be returned without 
enclosing
+      // quotes which is accomplished via JsonGenerator.writeRaw instead of 
JsonGenerator.write.
+      case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
+        // Slight optimization to avoid allocating a String instance, though 
the characters
+        // still have to be decoded... Jackson doesn't have a way to access 
the raw bytes.
+        generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, 
parser.getTextLength)
+
+      case JsonToken.VALUE_STRING =>
+        // The normal String case, pass it through to the output without 
enclosing quotes.
+        generator.writeRaw(parser.getText)
+
+      case JsonToken.VALUE_NULL =>
+        // A special case that needs to be handled outside of this method.
+        // If a requested field is null, the result must be null. The easiest
+        // way to achieve this is just by ignoring null tokens entirely.
+        throw SparkException.internalError("Do not attempt to copy a null 
field.")
+
+      case _ =>
+        // Handle other types including objects, arrays, booleans and numbers.
+        generator.copyCurrentStructure(parser)
+    }
+  }
+
+  final def evaluate(json: UTF8String, fields: Seq[UTF8String]): 
Seq[InternalRow] = {

Review Comment:
   Updated.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +492,52 @@ case class JsonTuple(children: Seq[Expression])
     }
   }
 
+  @transient
+  private lazy val evaluator: JsonTupleEvaluator = 
JsonTupleEvaluator(foldableFieldNames)
+
   override def eval(input: InternalRow): IterableOnce[InternalRow] = {
     val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
-    if (json == null) {
-      return nullRow
-    }
-
-    try {
-      /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having 
Jackson
-      detect character encoding which could fail for some malformed strings */
-      Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) 
{ parser =>
-        parseRow(parser, input)
-      }
-    } catch {
-      case _: JsonProcessingException =>
-        nullRow
-    }
-  }
-
-  private def parseRow(parser: JsonParser, input: InternalRow): 
Seq[InternalRow] = {
-    // only objects are supported
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      return nullRow
-    }
-
-    // evaluate the field names as String rather than UTF8String to
-    // optimize lookups from the json token, which is also a String
-    val fieldNames = if (constantFields == fieldExpressions.length) {
-      // typically the user will provide the field names as foldable 
expressions
-      // so we can use the cached copy
-      foldableFieldNames.map(_.orNull)
-    } else if (constantFields == 0) {
-      // none are foldable so all field names need to be evaluated from the 
input row
-      fieldExpressions.map { expr =>
-        
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-      }
-    } else {
-      // if there is a mix of constant and non-constant expressions
-      // prefer the cached copy when available
-      foldableFieldNames.zip(fieldExpressions).map {
-        case (null, expr) =>
-          
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
-        case (fieldName, _) => fieldName.orNull
-      }
-    }
-
-    val row = Array.ofDim[Any](fieldNames.length)
-
-    // start reading through the token stream, looking for any requested field 
names
-    while (parser.nextToken() != JsonToken.END_OBJECT) {
-      if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
-        // check to see if this field is desired in the output
-        val jsonField = parser.currentName
-        var idx = fieldNames.indexOf(jsonField)
-        if (idx >= 0) {
-          // it is, copy the child tree to the correct location in the output 
row
-          val output = new ByteArrayOutputStream()
-
-          // write the output directly to UTF8 encoded byte array
-          if (parser.nextToken() != JsonToken.VALUE_NULL) {
-            Utils.tryWithResource(jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)) {
-              generator => copyCurrentStructure(generator, parser)
-            }
-
-            val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
-            // SPARK-21804: json_tuple returns null values within repeated 
columns
-            // except the first one; so that we need to check the remaining 
fields.
-            do {
-              row(idx) = jsonValue
-              idx = fieldNames.indexOf(jsonField, idx + 1)
-            } while (idx >= 0)
-          }
-        }
-      }
-
-      // always skip children, it's cheap enough to do even if 
copyCurrentStructure was called
-      parser.skipChildren()
-    }
-
-    new GenericInternalRow(row) :: Nil
+    val filedNames = 
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String]).toArray
+    evaluator.evaluate(json, filedNames)
   }
 
-  private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {
-    parser.getCurrentToken match {
-      // if the user requests a string field it needs to be returned without 
enclosing
-      // quotes which is accomplished via JsonGenerator.writeRaw instead of 
JsonGenerator.write
-      case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
-        // slight optimization to avoid allocating a String instance, though 
the characters
-        // still have to be decoded... Jackson doesn't have a way to access 
the raw bytes
-        generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, 
parser.getTextLength)
-
-      case JsonToken.VALUE_STRING =>
-        // the normal String case, pass it through to the output without 
enclosing quotes
-        generator.writeRaw(parser.getText)
-
-      case JsonToken.VALUE_NULL =>
-        // a special case that needs to be handled outside of this method.
-        // if a requested field is null, the result must be null. the easiest
-        // way to achieve this is just by ignoring null tokens entirely
-        throw SparkException.internalError("Do not attempt to copy a null 
field.")
-
-      case _ =>
-        // handle other types including objects, arrays, booleans and numbers
-        generator.copyCurrentStructure(parser)
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+    val jsonTerm = ctx.freshName("json")
+    val jsonEval = jsonExpr.genCode(ctx)
+    val filedNamesTerm = ctx.freshName("fieldNames")
+    val fieldNamesEval = fieldExpressions.map(_.genCode(ctx))
+    val wrapperClass = classOf[IterableOnce[_]].getName
+    val setJson =

Review Comment:
   You're right, there's no need for special handling here.
   Updated.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -448,37 +448,27 @@ class GetJsonObjectEvaluator(cachedPath: UTF8String) {
 // scalastyle:on line.size.limit line.contains.tab
 case class JsonTuple(children: Seq[Expression])

Review Comment:
   Hmm...because this is an expression for `Generator`, I am not sure if 
`RuntimeReplaceable` supports it, as I have not seen a similar implementation 
in the code repository.
   If it can be supported, I can try it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to