yyanyy commented on code in PR #56260:
URL: https://github.com/apache/spark/pull/56260#discussion_r3352931460


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala:
##########
@@ -643,11 +645,39 @@ private[sql] object UnivocityParser {
         throw QueryExecutionErrors.endOfStreamError()
       }
       val curRecord = convert(nextRecord)
-      nextRecord = tokenizer.parseNext()
+      nextRecord = parseNextRecord()
       curRecord
     }
   }
 
+  /**
+   * Builds the user-facing MALFORMED_CSV_RECORD error raised when Univocity 
hits an
+   * ArrayIndexOutOfBoundsException for a malformed record (e.g. more columns 
than `maxColumns`).
+   * Univocity raises it either bare or wrapped in a TextParsingException 
depending on the call.
+   * SPARK-49444 fixed the per-line path; this also covers the streaming path.
+   */
+  private def malformedCsvRecord(cause: Throwable, badRecord: String): 
SparkRuntimeException =
+    new SparkRuntimeException(
+      errorClass = "MALFORMED_CSV_RECORD",
+      messageParameters = Map("badRecord" -> badRecord),
+      cause = cause)
+
+  /**
+   * Parses a single CSV line with the given Univocity tokenizer, translating 
the
+   * ArrayIndexOutOfBoundsException Univocity raises for a malformed record 
into
+   * MALFORMED_CSV_RECORD so the per-line and streaming paths fail 
consistently.
+   */
+  def parseLine(tokenizer: CsvParser, line: String): Array[String] = {
+    try {
+      tokenizer.parseLine(line)
+    } catch {
+      case e: TextParsingException if 
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
+        throw malformedCsvRecord(e, line)
+      case e: ArrayIndexOutOfBoundsException =>
+        throw malformedCsvRecord(e, line)
+    }

Review Comment:
   if we try really hard we might be able to reduce the duplicated catch with 
something like 
   ```
   private def withMalformedCsvHandling[T](badRecord: String)(f: => T): T =
     try f catch {
       case e: TextParsingException if 
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
         throw malformedCsvRecord(e, badRecord)
       case e: ArrayIndexOutOfBoundsException =>
         throw malformedCsvRecord(e, badRecord)
     }
   ```
   
   but it might not worth it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to