cloud-fan commented on code in PR #56260:
URL: https://github.com/apache/spark/pull/56260#discussion_r3398911491


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3506,6 +3506,80 @@ abstract class CSVSuite
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
 
+  test("SPARK-57195: multiLine CSV schema inference surfaces 
MALFORMED_CSV_RECORD for a row " +
+    "exceeding maxColumns") {
+    // multiLine schema inference reads through 
UnivocityParser.tokenizeStream, whose parseNext
+    // call was unguarded (SPARK-49444 only fixed the per-line parseLine 
path). A row with more
+    // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
+    // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is 
hit during inference.
+    withTempPath { path =>
+      Files.write(path.toPath, 
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+      val e = intercept[SparkRuntimeException] {
+        spark.read
+          .option("header", "false")
+          .option("inferSchema", "true")
+          .option("multiLine", "true")
+          .option("maxColumns", "2")
+          .csv(path.getAbsolutePath)
+      }
+      // badRecord comes from TextParsingException.getParsedContent (bounded), 
so its exact value
+      // is not pinned here; the error condition is what this asserts.
+      assert(e.getCondition == "MALFORMED_CSV_RECORD",
+        s"unexpected error condition: ${e.getCondition}")
+    }
+  }
+
+  test("SPARK-57195: non-multiLine CSV schema inference surfaces 
MALFORMED_CSV_RECORD for a row " +
+    "exceeding maxColumns") {
+    // Without multiLine, inference reads through 
TextInputCSVDataSource.inferFromDataset, which
+    // parsed each line with a raw Univocity CsvParser, bypassing the guarded 
parseLine. A row with
+    // more columns than maxColumns must surface as MALFORMED_CSV_RECORD, not 
a raw
+    // ArrayIndexOutOfBoundsException.
+    withTempPath { path =>
+      Files.write(path.toPath, 
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+      val e = intercept[SparkRuntimeException] {
+        spark.read
+          .option("header", "false")
+          .option("inferSchema", "true")
+          .option("maxColumns", "2")
+          .csv(path.getAbsolutePath)
+      }
+      checkError(
+        exception = e,
+        condition = "MALFORMED_CSV_RECORD",
+        parameters = Map("badRecord" -> "1,2,3"),
+        sqlState = "KD000")
+    }
+  }
+
+  test("SPARK-57195: multiLine CSV read failure with more than max columns") {
+    // The multiLine read path (parseStream) uses the same guarded streaming 
tokenizer as inference.
+    // With an explicit schema, an overflow row surfaces as 
MALFORMED_CSV_RECORD wrapped in
+    // FAILED_READ_FILE, mirroring the non-multiLine SPARK-49444 test above.
+    val schema = new StructType()
+      .add("intColumn", IntegerType, nullable = true)
+      .add("decimalColumn", DecimalType(10, 2), nullable = true)
+
+    val fileReadException = intercept[SparkException] {
+      spark.read
+        .schema(schema)
+        .option("header", "false")
+        .option("multiLine", "true")
+        .option("maxColumns", "2")
+        .csv(testFile(moreColumnsFile))
+        .collect()
+    }
+
+    checkErrorMatchPVals(
+      exception = fileReadException,
+      condition = "FAILED_READ_FILE.NO_HINT",
+      parameters = Map("path" -> s".*$moreColumnsFile"))
+
+    val malformedCSVException = 
fileReadException.getCause.asInstanceOf[SparkRuntimeException]
+    assert(malformedCSVException.getCondition == "MALFORMED_CSV_RECORD",
+      s"unexpected error condition: ${malformedCSVException.getCondition}")

Review Comment:
   Same as the inference test above — `matchPVals` keeps the sqlState/parameter 
validation without pinning the parsed content:
   ```suggestion
       checkError(
         exception = malformedCSVException,
         condition = "MALFORMED_CSV_RECORD",
         sqlState = Some("KD000"),
         parameters = Map("badRecord" -> ".*"),
         matchPVals = true)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -3506,6 +3506,80 @@ abstract class CSVSuite
     
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
   }
 
+  test("SPARK-57195: multiLine CSV schema inference surfaces 
MALFORMED_CSV_RECORD for a row " +
+    "exceeding maxColumns") {
+    // multiLine schema inference reads through 
UnivocityParser.tokenizeStream, whose parseNext
+    // call was unguarded (SPARK-49444 only fixed the per-line parseLine 
path). A row with more
+    // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
+    // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is 
hit during inference.
+    withTempPath { path =>
+      Files.write(path.toPath, 
"a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
+      val e = intercept[SparkRuntimeException] {
+        spark.read
+          .option("header", "false")
+          .option("inferSchema", "true")
+          .option("multiLine", "true")
+          .option("maxColumns", "2")
+          .csv(path.getAbsolutePath)
+      }
+      // badRecord comes from TextParsingException.getParsedContent (bounded), 
so its exact value
+      // is not pinned here; the error condition is what this asserts.
+      assert(e.getCondition == "MALFORMED_CSV_RECORD",
+        s"unexpected error condition: ${e.getCondition}")

Review Comment:
   This drops the sqlState and `badRecord`-parameter validation the old 
`checkError` did. `matchPVals` keeps both without pinning the exact value:
   ```suggestion
         // badRecord comes from TextParsingException.getParsedContent 
(bounded), so its exact value
         // is not pinned; ".*" keeps the error class, sqlState, and parameter 
validation.
         checkError(
           exception = e,
           condition = "MALFORMED_CSV_RECORD",
           sqlState = Some("KD000"),
           parameters = Map("badRecord" -> ".*"),
           matchPVals = true)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to