MaxGekk commented on code in PR #56581:
URL: https://github.com/apache/spark/pull/56581#discussion_r3452383009
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala:
##########
@@ -122,15 +122,26 @@ class CSVHeaderChecker(
def checkHeaderColumnNames(line: String): Unit = {
if (options.headerFlag) {
val parser = new CsvParser(options.asParserSettings)
- checkHeaderColumnNames(parser.parseLine(line))
+ checkHeaderColumnNames(UnivocityParser.parseLine(parser, line))
}
}
// This is currently only used to parse CSV with multiLine mode.
private[csv] def checkHeaderColumnNames(tokenizer:
AbstractParser[CsvParserSettings]): Unit = {
assert(options.multiLine, "This method should be executed with multiLine.")
if (options.headerFlag) {
- val firstRecord = tokenizer.parseNext()
+ val firstRecord = try {
+ tokenizer.parseNext()
+ } catch {
+ // scalastyle:off line.size.limit
+ case e: TextParsingException if
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
+ // scalastyle:on line.size.limit
+ // In the multiLine stream path the field appender is reset before
the AIOOBE propagates,
+ // so the record content is unavailable; pass an empty string as the
bad-record marker.
Review Comment:
The comment says "pass an empty string as the bad-record marker", but the
next line passes `Option(e.getParsedContent).getOrElse("")` -- the parsed
content when present, empty string only as a fallback. The analogue in
`UnivocityParser.convertStream` words it accurately ("use the bounded parsed
content when present"). Suggest matching that phrasing. Non-blocking nit.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3588,6 +3593,79 @@ abstract class CSVSuite
matchPVals = true)
}
+ test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns
surfaces " +
+ "MALFORMED_CSV_RECORD") {
+ // inferFromDataset called csvParser.parseLine(header) directly without
the AIOOBE guard
Review Comment:
This test (and the `Dataset[String]` test below) reads with no explicit
schema, so schema inference runs eagerly and `inferFromDataset` parses the
first line *before* the header check: `DataFrameReader` calls
`inferFromDataset` (line 225) ahead of `checkHeaderColumnNames(firstLine)`
(line 241), and `inferFromDataset` now routes the first line through
`UnivocityParser.parseLine` (CSVDataSource.scala:359). So the error here
surfaces from `inferFromDataset`, not from the non-multiLine `CSVHeaderChecker`
guards this comment names.
Net effect: `checkHeaderColumnNames(line: String)` (122-126) and
`checkHeaderColumnNames(lines, tokenizer)` (151-173) are not exercised by any
test -- reverting either guard would still pass CI -- and these two tests
partly overlap (both validate the same `inferFromDataset` line). Only the
multiLine test reaches a `CSVHeaderChecker` guard.
To actually cover the non-multiLine header guard, add a variant that passes
an explicit `.schema(...)` so inference is skipped and the read-time header
check runs. Non-blocking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]