MaxGekk commented on a change in pull request #27239: [SPARK-30530][SQL] Fix
filter pushdown for bad CSV records
URL: https://github.com/apache/spark/pull/27239#discussion_r368038138
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
##########
@@ -230,64 +230,55 @@ class UnivocityParser(
() => getCurrentInput,
() => None,
new RuntimeException("Malformed CSV record"))
- } else if (tokens.length != parsedSchema.length) {
+ }
+
+ var checkedTokens = tokens
+ var badRecordException: Option[Throwable] = None
+
+ if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it
as a malformed record.
// However, we still have chance to parse some of the tokens, by adding
extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the
number is larger.
- val checkedTokens = if (parsedSchema.length > tokens.length) {
+ checkedTokens = if (parsedSchema.length > tokens.length) {
tokens ++ new Array[String](parsedSchema.length - tokens.length)
} else {
tokens.take(parsedSchema.length)
}
- def getPartialResult(): Option[InternalRow] = {
- try {
- convert(checkedTokens).headOption
- } catch {
- case _: BadRecordException => None
- }
- }
- // For records with less or more tokens than the schema, tries to return
partial results
- // if possible.
- throw BadRecordException(
- () => getCurrentInput,
- () => getPartialResult(),
- new RuntimeException("Malformed CSV record"))
- } else {
- // When the length of the returned tokens is identical to the length of
the parsed schema,
- // we just need to:
- // 1. Convert the tokens that correspond to the required schema.
- // 2. Apply the pushdown filters to `requiredRow`.
- var i = 0
- val row = requiredRow.head
- var skipRow = false
- var badRecordException: Option[Throwable] = None
- while (i < requiredSchema.length) {
- try {
- if (!skipRow) {
- row(i) = valueConverters(i).apply(getToken(tokens, i))
- if (csvFilters.skipRow(row, i)) {
- skipRow = true
- }
- }
- if (skipRow) {
- row.setNullAt(i)
+ badRecordException = Some(new RuntimeException("Malformed CSV record"))
+ }
+ // When the length of the returned tokens is identical to the length of
the parsed schema,
+ // we just need to:
+ // 1. Convert the tokens that correspond to the required schema.
+ // 2. Apply the pushdown filters to `requiredRow`.
+ var i = 0
+ val row = requiredRow.head
+ var skipRow = false
+ while (i < requiredSchema.length) {
+ try {
+ if (!skipRow) {
+ row(i) = valueConverters(i).apply(getToken(tokens, i))
Review comment:
There are 3 cases:
1. Univocity parser is not able to parse its input. For example, it faced to
wrong Unicode symbol. In that case, it return `null` in `tokens`, and
`BadRecordException` will be raised here
https://github.com/apache/spark/blob/4e50f0291f032b4a5c0b46ed01fdef14e4cbb050/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala#L229-L232
2. Univocity parser returns `null` in the first token. In this case, we will
try to convert `null` to desired type according to `requiredSchema`. Most
likely, the conversion raises an exception which is will be converted
`BadRecordException` here
https://github.com/apache/spark/blob/4e50f0291f032b4a5c0b46ed01fdef14e4cbb050/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala#L277
and here
https://github.com/apache/spark/blob/4e50f0291f032b4a5c0b46ed01fdef14e4cbb050/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala#L286
2.1 If conversion doesn't fail, the `is null` filter will be applied to
the value and row could be passed to upper layer.
3. Univocity parser returns a valid string at index 0 in `tokens` but
conversion fails at
https://github.com/apache/spark/blob/4e50f0291f032b4a5c0b46ed01fdef14e4cbb050/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala#L267
with some exception. Similar situation to 2. The exception will be handled,
and transformed to `BadRecordException`.
New implementation with filters pushdown does not change the behavior in
those cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]