cloud-fan commented on code in PR #56260:
URL: https://github.com/apache/spark/pull/56260#discussion_r3398911491
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3506,6 +3506,80 @@ abstract class CSVSuite
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
+ test("SPARK-57195: multiLine CSV schema inference surfaces
MALFORMED_CSV_RECORD for a row " +
+ "exceeding maxColumns") {
+ // multiLine schema inference reads through
UnivocityParser.tokenizeStream, whose parseNext
+ // call was unguarded (SPARK-49444 only fixed the per-line parseLine
path). A row with more
+ // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
+ // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is
hit during inference.
+ withTempPath { path =>
+ Files.write(path.toPath,
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+ val e = intercept[SparkRuntimeException] {
+ spark.read
+ .option("header", "false")
+ .option("inferSchema", "true")
+ .option("multiLine", "true")
+ .option("maxColumns", "2")
+ .csv(path.getAbsolutePath)
+ }
+ // badRecord comes from TextParsingException.getParsedContent (bounded),
so its exact value
+ // is not pinned here; the error condition is what this asserts.
+ assert(e.getCondition == "MALFORMED_CSV_RECORD",
+ s"unexpected error condition: ${e.getCondition}")
+ }
+ }
+
+ test("SPARK-57195: non-multiLine CSV schema inference surfaces
MALFORMED_CSV_RECORD for a row " +
+ "exceeding maxColumns") {
+ // Without multiLine, inference reads through
TextInputCSVDataSource.inferFromDataset, which
+ // parsed each line with a raw Univocity CsvParser, bypassing the guarded
parseLine. A row with
+ // more columns than maxColumns must surface as MALFORMED_CSV_RECORD, not
a raw
+ // ArrayIndexOutOfBoundsException.
+ withTempPath { path =>
+ Files.write(path.toPath,
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+ val e = intercept[SparkRuntimeException] {
+ spark.read
+ .option("header", "false")
+ .option("inferSchema", "true")
+ .option("maxColumns", "2")
+ .csv(path.getAbsolutePath)
+ }
+ checkError(
+ exception = e,
+ condition = "MALFORMED_CSV_RECORD",
+ parameters = Map("badRecord" -> "1,2,3"),
+ sqlState = "KD000")
+ }
+ }
+
+ test("SPARK-57195: multiLine CSV read failure with more than max columns") {
+ // The multiLine read path (parseStream) uses the same guarded streaming
tokenizer as inference.
+ // With an explicit schema, an overflow row surfaces as
MALFORMED_CSV_RECORD wrapped in
+ // FAILED_READ_FILE, mirroring the non-multiLine SPARK-49444 test above.
+ val schema = new StructType()
+ .add("intColumn", IntegerType, nullable = true)
+ .add("decimalColumn", DecimalType(10, 2), nullable = true)
+
+ val fileReadException = intercept[SparkException] {
+ spark.read
+ .schema(schema)
+ .option("header", "false")
+ .option("multiLine", "true")
+ .option("maxColumns", "2")
+ .csv(testFile(moreColumnsFile))
+ .collect()
+ }
+
+ checkErrorMatchPVals(
+ exception = fileReadException,
+ condition = "FAILED_READ_FILE.NO_HINT",
+ parameters = Map("path" -> s".*$moreColumnsFile"))
+
+ val malformedCSVException =
fileReadException.getCause.asInstanceOf[SparkRuntimeException]
+ assert(malformedCSVException.getCondition == "MALFORMED_CSV_RECORD",
+ s"unexpected error condition: ${malformedCSVException.getCondition}")
Review Comment:
Same as the inference test above — `matchPVals` keeps the sqlState/parameter
validation without pinning the parsed content:
```suggestion
checkError(
exception = malformedCSVException,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> ".*"),
matchPVals = true)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3506,6 +3506,80 @@ abstract class CSVSuite
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
+ test("SPARK-57195: multiLine CSV schema inference surfaces
MALFORMED_CSV_RECORD for a row " +
+ "exceeding maxColumns") {
+ // multiLine schema inference reads through
UnivocityParser.tokenizeStream, whose parseNext
+ // call was unguarded (SPARK-49444 only fixed the per-line parseLine
path). A row with more
+ // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
+ // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is
hit during inference.
+ withTempPath { path =>
+ Files.write(path.toPath,
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+ val e = intercept[SparkRuntimeException] {
+ spark.read
+ .option("header", "false")
+ .option("inferSchema", "true")
+ .option("multiLine", "true")
+ .option("maxColumns", "2")
+ .csv(path.getAbsolutePath)
+ }
+ // badRecord comes from TextParsingException.getParsedContent (bounded),
so its exact value
+ // is not pinned here; the error condition is what this asserts.
+ assert(e.getCondition == "MALFORMED_CSV_RECORD",
+ s"unexpected error condition: ${e.getCondition}")
Review Comment:
This drops the sqlState and `badRecord`-parameter validation the old
`checkError` did. `matchPVals` keeps both without pinning the exact value:
```suggestion
// badRecord comes from TextParsingException.getParsedContent
(bounded), so its exact value
// is not pinned; ".*" keeps the error class, sqlState, and parameter
validation.
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
sqlState = Some("KD000"),
parameters = Map("badRecord" -> ".*"),
matchPVals = true)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]