sunchao commented on code in PR #56547:
URL: https://github.com/apache/spark/pull/56547#discussion_r3424023378
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala:
##########
@@ -136,6 +141,79 @@ case class GetJsonObject(json: Expression, path:
Expression)
copy(json = newLeft, path = newRight)
}
+object GetJsonObject {
+ private[sql] def simpleTopLevelField(path: UTF8String): Option[String] = {
+ try {
+ Option(path).flatMap(value =>
JsonPathParser.parse(value.toString)).collect {
+ case List(PathInstruction.Key, PathInstruction.Named(fieldName)) =>
fieldName
+ }
+ } catch {
+ // Numeric subscripts are parsed as Long and can overflow before the
parser returns None.
+ case _: NumberFormatException => None
+ }
+ }
+}
+
+/**
+ * Extracts multiple simple top-level fields from a JSON string in one parse.
This is an internal
+ * expression used to share sibling [[GetJsonObject]] expressions; unsupported
JSON paths remain
+ * as independent GetJsonObject expressions.
+ */
+case class MultiGetJsonObject(
Review Comment:
done
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala:
##########
@@ -573,3 +573,152 @@ case class GetJsonObjectEvaluator(cachedPath: UTF8String)
{
}
}
}
+
+/**
+ * Evaluates multiple simple top-level JSON fields in one parse.
+ */
+case class MultiGetJsonObjectEvaluator(
+ fieldNames: Seq[String],
+ fallbackPaths: Seq[UTF8String]) {
+ import SharedFactory._
+
+ require(
+ fieldNames.nonEmpty &&
+ fieldNames.distinct.length == fieldNames.length &&
+ fallbackPaths.length == fieldNames.length)
+
+ @transient
+ private lazy val fieldToOrdinal: Map[String, Int] =
fieldNames.zipWithIndex.toMap
+
+ @transient
+ private lazy val nullRow: InternalRow =
+ new GenericInternalRow(Array.ofDim[Any](fieldNames.length))
+
+ @transient
+ private lazy val fallbackEvaluators: Seq[GetJsonObjectEvaluator] =
+ fallbackPaths.map(new GetJsonObjectEvaluator(_))
+
+ private def fallback(json: UTF8String): InternalRow = {
+ new GenericInternalRow(fallbackEvaluators.map { evaluator =>
+ evaluator.setJson(json)
+ evaluator.evaluate()
+ }.toArray)
+ }
+
+ def evaluate(json: UTF8String): InternalRow = {
+ if (json == null) return null
+
+ val values = Array.ofDim[Any](fieldNames.length)
+ val matched = Array.ofDim[Boolean](fieldNames.length)
+
+ try {
+ val validObject = Utils.tryWithResource(
+ CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
+ if (parser.nextToken() != JsonToken.START_OBJECT) {
+ false
+ } else {
+ var token = parser.nextToken()
+ while (token != null && token != JsonToken.END_OBJECT) {
+ if (token == JsonToken.FIELD_NAME) {
+ val fieldName = parser.currentName
+ val ordinal = fieldToOrdinal.get(fieldName).filter(!matched(_))
+ val valueToken = parser.nextToken()
+ if (ordinal.nonEmpty && valueToken != JsonToken.VALUE_NULL) {
+ val index = ordinal.get
+ matched(index) = true
+ copyCurrentStructure(parser).foreach(value => values(index) =
value)
+ } else {
+ parser.skipChildren()
+ }
+ } else {
+ parser.skipChildren()
+ }
+ token = parser.nextToken()
+ }
+ token == JsonToken.END_OBJECT
+ }
+ }
+ if (validObject) {
+ new GenericInternalRow(values)
+ } else {
+ nullRow
+ }
+ } catch {
+ // Every simple top-level legacy extraction scans through the root
object's closing token,
+ // so a syntax failure makes every sibling null without needing per-path
reparsing.
+ case _: JsonParseException => nullRow
+ // A parser-side rendering failure can leave the shared token stream
unusable. Reparse each
+ // path with the legacy evaluator so one bad selected value cannot erase
sibling results.
+ case _: JsonProcessingException => fallback(json)
+ }
+ }
+
+ private def copyCurrentStructure(parser: JsonParser): Option[UTF8String] = {
+ val output = new ByteArrayOutputStream()
Review Comment:
yes! updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]