panbingkun commented on code in PR #48908:
URL: https://github.com/apache/spark/pull/48908#discussion_r1889780224
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +496,119 @@ case class JsonTuple(children: Seq[Expression])
}
}
+ @transient
+ private lazy val evaluator: JsonTupleEvaluator =
JsonTupleEvaluator(fieldExpressions.length)
+
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
- if (json == null) {
- return nullRow
- }
- try {
- /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having
Jackson
- detect character encoding which could fail for some malformed strings */
- Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json))
{ parser =>
- parseRow(parser, input)
- }
- } catch {
- case _: JsonProcessingException =>
- nullRow
- }
- }
-
- private def parseRow(parser: JsonParser, input: InternalRow):
Seq[InternalRow] = {
- // only objects are supported
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return nullRow
- }
-
- // evaluate the field names as String rather than UTF8String to
- // optimize lookups from the json token, which is also a String
+ // Evaluate the field names as String rather than UTF8String to
+ // optimize lookups from the json token, which is also a String.
val fieldNames = if (constantFields == fieldExpressions.length) {
Review Comment:
Okay, let me update it.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +496,119 @@ case class JsonTuple(children: Seq[Expression])
}
}
+ @transient
+ private lazy val evaluator: JsonTupleEvaluator =
JsonTupleEvaluator(fieldExpressions.length)
+
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
- if (json == null) {
- return nullRow
- }
- try {
- /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having
Jackson
- detect character encoding which could fail for some malformed strings */
- Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json))
{ parser =>
- parseRow(parser, input)
- }
- } catch {
- case _: JsonProcessingException =>
- nullRow
- }
- }
-
- private def parseRow(parser: JsonParser, input: InternalRow):
Seq[InternalRow] = {
- // only objects are supported
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return nullRow
- }
-
- // evaluate the field names as String rather than UTF8String to
- // optimize lookups from the json token, which is also a String
+ // Evaluate the field names as String rather than UTF8String to
+ // optimize lookups from the json token, which is also a String.
val fieldNames = if (constantFields == fieldExpressions.length) {
Review Comment:
Updated, the logic looks much simpler, thank you!
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String,
String]) {
UTF8String.fromString(dt.sql)
}
}
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
Review Comment:
Updated.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String,
String]) {
UTF8String.fromString(dt.sql)
}
}
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
+
+ import SharedFactory._
+
+ // If processing fails this shared value will be returned.
+ @transient private lazy val nullRow: Seq[InternalRow] =
+ new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil
+
+ // And count the number of foldable fields, we'll use this later to optimize
evaluation.
+ @transient private lazy val constantFields: Int = foldableFields.count(_ !=
null)
+
+ private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = {
Review Comment:
Updated.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +493,53 @@ case class JsonTuple(children: Seq[Expression])
}
}
+ @transient
+ private lazy val evaluator: JsonTupleEvaluator =
JsonTupleEvaluator(foldableFieldNames)
+
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
- if (json == null) {
- return nullRow
- }
-
- try {
- /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having
Jackson
- detect character encoding which could fail for some malformed strings */
- Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json))
{ parser =>
- parseRow(parser, input)
- }
- } catch {
- case _: JsonProcessingException =>
- nullRow
- }
- }
-
- private def parseRow(parser: JsonParser, input: InternalRow):
Seq[InternalRow] = {
- // only objects are supported
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return nullRow
- }
-
- // evaluate the field names as String rather than UTF8String to
- // optimize lookups from the json token, which is also a String
- val fieldNames = if (constantFields == fieldExpressions.length) {
- // typically the user will provide the field names as foldable
expressions
- // so we can use the cached copy
- foldableFieldNames.map(_.orNull)
- } else if (constantFields == 0) {
- // none are foldable so all field names need to be evaluated from the
input row
- fieldExpressions.map { expr =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- }
- } else {
- // if there is a mix of constant and non-constant expressions
- // prefer the cached copy when available
- foldableFieldNames.zip(fieldExpressions).map {
- case (null, expr) =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- case (fieldName, _) => fieldName.orNull
- }
- }
-
- val row = Array.ofDim[Any](fieldNames.length)
-
- // start reading through the token stream, looking for any requested field
names
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
- // check to see if this field is desired in the output
- val jsonField = parser.currentName
- var idx = fieldNames.indexOf(jsonField)
- if (idx >= 0) {
- // it is, copy the child tree to the correct location in the output
row
- val output = new ByteArrayOutputStream()
-
- // write the output directly to UTF8 encoded byte array
- if (parser.nextToken() != JsonToken.VALUE_NULL) {
- Utils.tryWithResource(jsonFactory.createGenerator(output,
JsonEncoding.UTF8)) {
- generator => copyCurrentStructure(generator, parser)
- }
-
- val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
- // SPARK-21804: json_tuple returns null values within repeated
columns
- // except the first one; so that we need to check the remaining
fields.
- do {
- row(idx) = jsonValue
- idx = fieldNames.indexOf(jsonField, idx + 1)
- } while (idx >= 0)
- }
- }
- }
-
- // always skip children, it's cheap enough to do even if
copyCurrentStructure was called
- parser.skipChildren()
- }
-
- new GenericInternalRow(row) :: Nil
+ val fields = fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String])
+ evaluator.evaluate(json, fields)
}
- private def copyCurrentStructure(generator: JsonGenerator, parser:
JsonParser): Unit = {
- parser.getCurrentToken match {
- // if the user requests a string field it needs to be returned without
enclosing
- // quotes which is accomplished via JsonGenerator.writeRaw instead of
JsonGenerator.write
- case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
- // slight optimization to avoid allocating a String instance, though
the characters
- // still have to be decoded... Jackson doesn't have a way to access
the raw bytes
- generator.writeRaw(parser.getTextCharacters, parser.getTextOffset,
parser.getTextLength)
-
- case JsonToken.VALUE_STRING =>
- // the normal String case, pass it through to the output without
enclosing quotes
- generator.writeRaw(parser.getText)
-
- case JsonToken.VALUE_NULL =>
- // a special case that needs to be handled outside of this method.
- // if a requested field is null, the result must be null. the easiest
- // way to achieve this is just by ignoring null tokens entirely
- throw SparkException.internalError("Do not attempt to copy a null
field.")
-
- case _ =>
- // handle other types including objects, arrays, booleans and numbers
- generator.copyCurrentStructure(parser)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+ val jsonTerm = ctx.freshName("json")
+ val jsonEval = jsonExpr.genCode(ctx)
+ val fieldsTerm = ctx.freshName("fields")
+ val fieldsEval = fieldExpressions.map(_.genCode(ctx))
+ val arraySeqClass = classOf[ArraySeq[_]].getName
+ val wrapperClass = classOf[IterableOnce[_]].getName
+ val setJson =
+ s"""
+ |if (${jsonEval.isNull}) {
+ | $jsonTerm = null;
+ |} else {
+ | $jsonTerm = ${jsonEval.value};
+ |}
+ |""".stripMargin
+ val setFields = fieldsEval.zipWithIndex.map {
+ case (fieldEval, idx) =>
+ s"""
+ |if (${fieldEval.isNull}) {
+ | $fieldsTerm[$idx] = null;
+ |} else {
+ | $fieldsTerm[$idx] = ${fieldEval.value};
+ |}
+ |""".stripMargin
}
+ ev.copy(code =
+ code"""
+ |UTF8String $jsonTerm = null;
+ |UTF8String[] $fieldsTerm = new UTF8String[${fieldExpressions.length
- 1}];
Review Comment:
Yeah.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -162,3 +165,116 @@ case class SchemaOfJsonEvaluator(options: Map[String,
String]) {
UTF8String.fromString(dt.sql)
}
}
+
+/**
+ * The expression `JsonTuple` will utilize it to support codegen.
+ */
+case class JsonTupleEvaluator(foldableFields: IndexedSeq[Option[String]]) {
+
+ import SharedFactory._
+
+ // If processing fails this shared value will be returned.
+ @transient private lazy val nullRow: Seq[InternalRow] =
+ new GenericInternalRow(Array.ofDim[Any](foldableFields.length)) :: Nil
+
+ // And count the number of foldable fields, we'll use this later to optimize
evaluation.
+ @transient private lazy val constantFields: Int = foldableFields.count(_ !=
null)
+
+ private def getCachedFields(fields: Seq[UTF8String]): Seq[String] = {
+ // Evaluate the field names as String rather than UTF8String to
+ // optimize lookups from the json token, which is also a String.
+ if (constantFields == fields.length) {
+ // Typically the user will provide the field names as foldable
expressions
+ // so we can use the cached copy.
+ foldableFields.map(_.orNull)
+ } else if (constantFields == 0) {
+ // None are foldable so all field names need to be evaluated from the
input row.
+ fields.map { f => Option(f.toString).orNull }
+ } else {
+ // If there is a mix of constant and non-constant expressions
+ // prefer the cached copy when available.
+ foldableFields.zip(fields).map {
+ case (null, f) => Option(f.toString).orNull
+ case (fieldName, _) => fieldName.orNull
+ }
+ }
+ }
+
+ private def parseRow(parser: JsonParser, fieldNames: Seq[String]):
Seq[InternalRow] = {
+ // Only objects are supported.
+ if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow
+
+ val row = Array.ofDim[Any](fieldNames.length)
+
+ // Start reading through the token stream, looking for any requested field
names.
+ while (parser.nextToken() != JsonToken.END_OBJECT) {
+ if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+ // Check to see if this field is desired in the output.
+ val jsonField = parser.currentName
+ var idx = fieldNames.indexOf(jsonField)
+ if (idx >= 0) {
+ // It is, copy the child tree to the correct location in the output
row.
+ val output = new ByteArrayOutputStream()
+
+ // Write the output directly to UTF8 encoded byte array.
+ if (parser.nextToken() != JsonToken.VALUE_NULL) {
+ Utils.tryWithResource(jsonFactory.createGenerator(output,
JsonEncoding.UTF8)) {
+ generator => copyCurrentStructure(generator, parser)
+ }
+
+ val jsonValue = UTF8String.fromBytes(output.toByteArray)
+
+ // SPARK-21804: json_tuple returns null values within repeated
columns
+ // except the first one; so that we need to check the remaining
fields.
+ do {
+ row(idx) = jsonValue
+ idx = fieldNames.indexOf(jsonField, idx + 1)
+ } while (idx >= 0)
+ }
+ }
+ }
+
+ // Always skip children, it's cheap enough to do even if
copyCurrentStructure was called.
+ parser.skipChildren()
+ }
+ new GenericInternalRow(row) :: Nil
+ }
+
+ private def copyCurrentStructure(generator: JsonGenerator, parser:
JsonParser): Unit = {
+ parser.getCurrentToken match {
+ // If the user requests a string field it needs to be returned without
enclosing
+ // quotes which is accomplished via JsonGenerator.writeRaw instead of
JsonGenerator.write.
+ case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
+ // Slight optimization to avoid allocating a String instance, though
the characters
+ // still have to be decoded... Jackson doesn't have a way to access
the raw bytes.
+ generator.writeRaw(parser.getTextCharacters, parser.getTextOffset,
parser.getTextLength)
+
+ case JsonToken.VALUE_STRING =>
+ // The normal String case, pass it through to the output without
enclosing quotes.
+ generator.writeRaw(parser.getText)
+
+ case JsonToken.VALUE_NULL =>
+ // A special case that needs to be handled outside of this method.
+ // If a requested field is null, the result must be null. The easiest
+ // way to achieve this is just by ignoring null tokens entirely.
+ throw SparkException.internalError("Do not attempt to copy a null
field.")
+
+ case _ =>
+ // Handle other types including objects, arrays, booleans and numbers.
+ generator.copyCurrentStructure(parser)
+ }
+ }
+
+ final def evaluate(json: UTF8String, fields: Seq[UTF8String]):
Seq[InternalRow] = {
Review Comment:
Updated.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -502,111 +492,52 @@ case class JsonTuple(children: Seq[Expression])
}
}
+ @transient
+ private lazy val evaluator: JsonTupleEvaluator =
JsonTupleEvaluator(foldableFieldNames)
+
override def eval(input: InternalRow): IterableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
- if (json == null) {
- return nullRow
- }
-
- try {
- /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having
Jackson
- detect character encoding which could fail for some malformed strings */
- Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json))
{ parser =>
- parseRow(parser, input)
- }
- } catch {
- case _: JsonProcessingException =>
- nullRow
- }
- }
-
- private def parseRow(parser: JsonParser, input: InternalRow):
Seq[InternalRow] = {
- // only objects are supported
- if (parser.nextToken() != JsonToken.START_OBJECT) {
- return nullRow
- }
-
- // evaluate the field names as String rather than UTF8String to
- // optimize lookups from the json token, which is also a String
- val fieldNames = if (constantFields == fieldExpressions.length) {
- // typically the user will provide the field names as foldable
expressions
- // so we can use the cached copy
- foldableFieldNames.map(_.orNull)
- } else if (constantFields == 0) {
- // none are foldable so all field names need to be evaluated from the
input row
- fieldExpressions.map { expr =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- }
- } else {
- // if there is a mix of constant and non-constant expressions
- // prefer the cached copy when available
- foldableFieldNames.zip(fieldExpressions).map {
- case (null, expr) =>
-
Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull
- case (fieldName, _) => fieldName.orNull
- }
- }
-
- val row = Array.ofDim[Any](fieldNames.length)
-
- // start reading through the token stream, looking for any requested field
names
- while (parser.nextToken() != JsonToken.END_OBJECT) {
- if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
- // check to see if this field is desired in the output
- val jsonField = parser.currentName
- var idx = fieldNames.indexOf(jsonField)
- if (idx >= 0) {
- // it is, copy the child tree to the correct location in the output
row
- val output = new ByteArrayOutputStream()
-
- // write the output directly to UTF8 encoded byte array
- if (parser.nextToken() != JsonToken.VALUE_NULL) {
- Utils.tryWithResource(jsonFactory.createGenerator(output,
JsonEncoding.UTF8)) {
- generator => copyCurrentStructure(generator, parser)
- }
-
- val jsonValue = UTF8String.fromBytes(output.toByteArray)
-
- // SPARK-21804: json_tuple returns null values within repeated
columns
- // except the first one; so that we need to check the remaining
fields.
- do {
- row(idx) = jsonValue
- idx = fieldNames.indexOf(jsonField, idx + 1)
- } while (idx >= 0)
- }
- }
- }
-
- // always skip children, it's cheap enough to do even if
copyCurrentStructure was called
- parser.skipChildren()
- }
-
- new GenericInternalRow(row) :: Nil
+ val filedNames =
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String]).toArray
+ evaluator.evaluate(json, filedNames)
}
- private def copyCurrentStructure(generator: JsonGenerator, parser:
JsonParser): Unit = {
- parser.getCurrentToken match {
- // if the user requests a string field it needs to be returned without
enclosing
- // quotes which is accomplished via JsonGenerator.writeRaw instead of
JsonGenerator.write
- case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
- // slight optimization to avoid allocating a String instance, though
the characters
- // still have to be decoded... Jackson doesn't have a way to access
the raw bytes
- generator.writeRaw(parser.getTextCharacters, parser.getTextOffset,
parser.getTextLength)
-
- case JsonToken.VALUE_STRING =>
- // the normal String case, pass it through to the output without
enclosing quotes
- generator.writeRaw(parser.getText)
-
- case JsonToken.VALUE_NULL =>
- // a special case that needs to be handled outside of this method.
- // if a requested field is null, the result must be null. the easiest
- // way to achieve this is just by ignoring null tokens entirely
- throw SparkException.internalError("Do not attempt to copy a null
field.")
-
- case _ =>
- // handle other types including objects, arrays, booleans and numbers
- generator.copyCurrentStructure(parser)
+ override protected def doGenCode(ctx: CodegenContext, ev: ExprCode):
ExprCode = {
+ val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
+ val jsonTerm = ctx.freshName("json")
+ val jsonEval = jsonExpr.genCode(ctx)
+ val filedNamesTerm = ctx.freshName("fieldNames")
+ val fieldNamesEval = fieldExpressions.map(_.genCode(ctx))
+ val wrapperClass = classOf[IterableOnce[_]].getName
+ val setJson =
Review Comment:
You're right, there's no need for special handling here.
Updated.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -448,37 +448,27 @@ class GetJsonObjectEvaluator(cachedPath: UTF8String) {
// scalastyle:on line.size.limit line.contains.tab
case class JsonTuple(children: Seq[Expression])
Review Comment:
Hmm...because this is an expression for `Generator`, I am not sure if
`RuntimeReplaceable` supports it, as I have not seen a similar implementation
in the code repository.
If it can be supported, I can try it out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]