Xianjin YE created SPARK-37176:
----------------------------------

             Summary: JsonSource's infer should have the same exception handle 
logic as JacksonParser's parse logic
                 Key: SPARK-37176
                 URL: https://issues.apache.org/jira/browse/SPARK-37176
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.0, 3.1.2, 3.0.3
            Reporter: Xianjin YE


JacksonParser's exception handle logic is different with 
org.apache.spark.sql.catalyst.json.JsonInferSchema#infer logic, the different 
can be saw as below:
{code:java}
// code JacksonParser's parse
try {
      Utils.tryWithResource(createParser(factory, record)) { parser =>
        // a null first token is equivalent to testing for input.trim.isEmpty
        // but it works on any token stream and not just strings
        parser.nextToken() match {
          case null => None
          case _ => rootConverter.apply(parser) match {
            case null => throw 
QueryExecutionErrors.rootConverterReturnNullError()
            case rows => rows.toSeq
          }
        }
      }
    } catch {
      case e: SparkUpgradeException => throw e
      case e @ (_: RuntimeException | _: JsonProcessingException | _: 
MalformedInputException) =>
        // JSON parser currently doesn't support partial results for corrupted 
records.
        // For such records, all fields other than the field configured by
        // `columnNameOfCorruptRecord` are set to `null`.
        throw BadRecordException(() => recordLiteral(record), () => None, e)
      case e: CharConversionException if options.encoding.isEmpty =>
        val msg =
          """JSON parser cannot handle a character in its input.
            |Specifying encoding as an input option explicitly might help to 
resolve the issue.
            |""".stripMargin + e.getMessage
        val wrappedCharException = new CharConversionException(msg)
        wrappedCharException.initCause(e)
        throw BadRecordException(() => recordLiteral(record), () => None, 
wrappedCharException)
      case PartialResultException(row, cause) =>
        throw BadRecordException(
          record = () => recordLiteral(record),
          partialResult = () => Some(row),
          cause)
    }
{code}
v.s. 
{code:java}
// JsonInferSchema's infer logic
    val mergedTypesFromPartitions = json.mapPartitions { iter =>
      val factory = options.buildJsonFactory()
      iter.flatMap { row =>
        try {
          Utils.tryWithResource(createParser(factory, row)) { parser =>
            parser.nextToken()
            Some(inferField(parser))
          }
        } catch {
          case  e @ (_: RuntimeException | _: JsonProcessingException) => 
parseMode match {
            case PermissiveMode =>
              Some(StructType(Seq(StructField(columnNameOfCorruptRecord, 
StringType))))
            case DropMalformedMode =>
              None
            case FailFastMode =>
              throw 
QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e)
          }
        }
      }.reduceOption(typeMerger).toIterator
    }

{code}
They should have the same exception handle logic, otherwise it may confuse user 
because of the inconsistency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to