Github user NathanHowell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7946#discussion_r40809995
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
 ---
    @@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: 
Expression)
         }
       }
     }
    +
    +case class JsonTuple(children: Seq[Expression])
    +  extends Expression with CodegenFallback {
    +
    +  import SharedFactory._
    +
    +  override def nullable: Boolean = {
    +    // a row is always returned
    +    false
    +  }
    +
    +  // if processing fails this shared value will be returned
    +  @transient private lazy val nullRow: InternalRow =
    +    new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
    +
    +  // the json body is the first child
    +  @transient private lazy val jsonExpr: Expression = children.head
    +
    +  // the fields to query are the remaining children
    +  @transient private lazy val fieldExpressions: Seq[Expression] = 
children.tail
    +
    +  // eagerly evaluate any foldable the field names
    +  @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
    +    fieldExpressions.map {
    +      case expr if expr.foldable => 
expr.eval().asInstanceOf[UTF8String].toString
    +      case _ => null
    +    }.toIndexedSeq
    +  }
    +
    +  // and count the number of foldable fields, we'll use this later to 
optimize evaluation
    +  @transient private lazy val constantFields: Int = 
foldableFieldNames.count(_ != null)
    +
    +  override lazy val dataType: StructType = {
    +    val fields = fieldExpressions.zipWithIndex.map {
    +      case (_, idx) => StructField(
    +        name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
    +        dataType = StringType,
    +        nullable = true)
    +    }
    +
    +    StructType(fields)
    +  }
    +
    +  override def prettyName: String = "json_tuple"
    +
    +  override def checkInputDataTypes(): TypeCheckResult = {
    +    if (children.length < 2) {
    +      TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
    +    } else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
    +      TypeCheckResult.TypeCheckSuccess
    +    } else {
    +      TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
    +    }
    +  }
    +
    +  override def eval(input: InternalRow): InternalRow = {
    +    try {
    +      val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
    +      if (json == null) {
    +        return nullRow
    +      }
    +
    +      val parser = jsonFactory.createParser(json.getBytes)
    +
    +      // only objects are supported
    +      if (parser.nextToken() != JsonToken.START_OBJECT) {
    +        return nullRow
    +      }
    +
    +      // evaluate the field names as String rather than UTF8String to
    +      // optimize lookups from the json token, which is also a String
    +      val fieldNames = if (constantFields == fieldExpressions.length) {
    +        // typically the user will provide the field names as foldable 
expressions
    +        // so we can use the cached copy
    +        foldableFieldNames
    +      } else if (constantFields == 0) {
    +        // none are foldable so all field names need to be evaluated from 
the input row
    +        
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString)
    +      } else {
    +        // if there is a mix of constant and non-constant expressions
    +        // prefer the cached copy when available
    +        foldableFieldNames.zip(fieldExpressions).map {
    +          case (null, expr) => 
expr.eval(input).asInstanceOf[UTF8String].toString
    +          case (fieldName, _) => fieldName
    +        }
    +      }
    +
    +      val row = Array.ofDim[Any](fieldNames.length)
    +
    +      // start reading through the token stream, looking for any requested 
field names
    +      while (parser.nextToken() != JsonToken.END_OBJECT) {
    +        if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
    +          // check to see if this field is desired in the output
    +          val idx = fieldNames.indexOf(parser.getCurrentName)
    +          if (idx >= 0) {
    +            // it is, copy the child tree to the correct location in the 
output row
    +            val output = new ByteArrayOutputStream()
    +
    +            // write the output directly to UTF8 encoded byte array
    +            val generator = jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)
    +            if (parser.nextToken() != JsonToken.VALUE_NULL) {
    +              copyCurrentStructure(generator, parser)
    +              generator.close()
    +
    +              row(idx) = UTF8String.fromBytes(output.toByteArray)
    +            }
    +          }
    +        }
    +
    +        // always skip children, it's cheap enough to do even if 
copyCurrentStructure was called
    +        parser.skipChildren()
    +      }
    +
    +      new GenericInternalRow(row)
    +    } catch {
    +      case _: JsonProcessingException =>
    +        nullRow
    +    }
    +  }
    +
    +  private def copyCurrentStructure(generator: JsonGenerator, parser: 
JsonParser): Unit = {
    +    parser.getCurrentToken match {
    +      case JsonToken.VALUE_STRING if parser.hasTextCharacters =>
    +        generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, 
parser.getTextLength)
    --- End diff --
    
    It's an optimization to avoid creating a `String` instance if 
`hasTextCharacters` is true. This means we can access the field value by offset 
and index into a byte array.
    
    This explicit handling of `VALUE_STRING` is just to avoid putting quotes 
around the output string, which is what `JsonGenerator.copyCurrentStructure` 
would do. I'll put comments in here to make that obvious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to