MaxGekk commented on code in PR #56581:
URL: https://github.com/apache/spark/pull/56581#discussion_r3434852281
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3588,6 +3588,72 @@ abstract class CSVSuite
matchPVals = true)
}
+ test("SPARK-57515: non-multiLine CSV read with header exceeding maxColumns
surfaces " +
+ "MALFORMED_CSV_RECORD") {
+ // CSVHeaderChecker called tokenizer.parseLine(header) directly without
the AIOOBE guard
+ // that UnivocityParser.parseLine wraps. A header line wider than
maxColumns must surface
+ // as MALFORMED_CSV_RECORD, not a raw ArrayIndexOutOfBoundsException.
+ withTempPath { path =>
+ Files.write(path.toPath,
"a,b,c\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+ val e = intercept[SparkRuntimeException] {
+ spark.read
+ .option("header", "true")
+ .option("maxColumns", "2")
+ .csv(path.getAbsolutePath)
+ .collect()
+ }
+ checkError(
+ exception = e,
+ condition = "MALFORMED_CSV_RECORD",
+ sqlState = Some("KD000"),
+ parameters = Map("badRecord" -> ".*"),
+ matchPVals = true)
Review Comment:
This path passes the header verbatim as `badRecord`
(CSVHeaderChecker.scala:164), so the exact value is deterministic and
assertable here — as the `Dataset[String]` test below already does. `.*` passes
regardless of the actual content, so it wouldn't catch a regression that put
wrong/empty content in the error. (The multiLine test's `.*` is justified — its
`badRecord` comes from `getParsedContent`, which isn't deterministic.)
```suggestion
parameters = Map("badRecord" -> "a,b,c"),
matchPVals = false)
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala:
##########
@@ -146,9 +155,33 @@ class CSVHeaderChecker(
// be not extracted.
if (options.headerFlag && isStartOfFile) {
CSVExprUtils.extractHeader(lines, options).foreach { header =>
- checkHeaderColumnNames(tokenizer.parseLine(header))
+ val tokens = try {
+ tokenizer.parseLine(header)
+ } catch {
+ // scalastyle:off line.size.limit
+ case e: TextParsingException if
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
+ // scalastyle:on line.size.limit
+ throw malformedCsvHeaderRecord(e, header)
+ case e: ArrayIndexOutOfBoundsException =>
+ throw malformedCsvHeaderRecord(e, header)
+ }
+ checkHeaderColumnNames(tokens)
}
}
setHeaderForSingleVariantColumn.foreach(f => f(headerColumnNames))
}
+
+ // scalastyle:off line.size.limit
+ private def malformedCsvHeaderRecord(cause: Throwable, badRecord: String):
SparkRuntimeException = {
Review Comment:
`malformedCsvHeaderRecord` is a verbatim copy of
`UnivocityParser.malformedCsvRecord` (UnivocityParser.scala:657-667) — same
error class, same `MAX_ERROR_CONTENT_LENGTH` truncation, same
`SparkRuntimeException` shape. Both classes are in the same package
(`org.apache.spark.sql.catalyst.csv`), and the analogue is only
object-`private`. Consider making it `private[csv]` and calling it from the
three catch sites here, dropping this copy. That also removes a future-drift
risk: a later change to `MALFORMED_CSV_RECORD` construction applied to one copy
would silently diverge header vs. row malformed-record messages.
Optional deeper cleanup: both `tokenizer` callers pass a `CsvParser`
(UnivocityParser.scala:600, 693), so narrowing the two `private[csv]`
overloads' parameter type from `AbstractParser[CsvParserSettings]` to
`CsvParser` would let the non-multiLine path reuse `UnivocityParser.parseLine`
wholesale and drop its try/catch too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]