Hisoka-X commented on code in PR #40632:
URL: https://github.com/apache/spark/pull/40632#discussion_r1174717715
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala:
##########
@@ -65,6 +65,9 @@ class FailureSafeParser[IN](
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
+ if
(e.getCause.getMessage.startsWith("[MALFORMED_RECORD_IN_PARSING")) {
Review Comment:
In this way, determine whether the thrown exception is a subClass of
`MALFORMED_RECORD_IN_PARSING`
##########
core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala:
##########
@@ -66,7 +66,6 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
val errorInfo = errorInfoMap.getOrElse(
mainErrorClass,
throw SparkException.internalError(s"Cannot find main error class
'$errorClass'"))
- assert(errorInfo.subClass.isDefined == subErrorClass.isDefined)
Review Comment:
Remove assert for subClass, so we can create Exception without subClass.
Like `MALFORMED_PROTOBUF_MESSAGE`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala:
##########
@@ -103,24 +103,27 @@ class JacksonParser(
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
- private def makeRootConverter(dt: DataType): JsonParser =>
Iterable[InternalRow] = {
+ private def makeRootConverter(dt: DataType): (JsonParser, () => UTF8String)
+ => Iterable[InternalRow] = {
dt match {
case st: StructType => makeStructRootConverter(st)
case mt: MapType => makeMapRootConverter(mt)
case at: ArrayType => makeArrayRootConverter(at)
}
}
- private def makeStructRootConverter(st: StructType): JsonParser =>
Iterable[InternalRow] = {
+ private def makeStructRootConverter(st: StructType): (JsonParser, () =>
UTF8String)
+ => Iterable[InternalRow] = {
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
val jsonFilters = if (SQLConf.get.jsonFilterPushDown) {
new JsonFilters(filters, st)
} else {
new NoopFilters
}
- (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
- case START_OBJECT => convertObject(parser, st, fieldConverters,
jsonFilters, isRoot = true)
+ (parser: JsonParser, record: () => UTF8String) =>
Review Comment:
Add `record: () => UTF8String` method, so we can get record in parser. Then
throw exception with record.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]