MaxGekk commented on code in PR #42979:
URL: https://github.com/apache/spark/pull/42979#discussion_r1360246269
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala:
##########
@@ -99,6 +100,14 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
wrappedCharException)
+ case e: FileNotFoundException if options.ignoreMissingFiles =>
+ logWarning(s"Skipped missing file", e)
+ Some(StructType(Nil))
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
Review Comment:
The comment is useless, it just rephrases the code below.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala:
##########
@@ -99,6 +100,14 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
wrappedCharException)
+ case e: FileNotFoundException if options.ignoreMissingFiles =>
Review Comment:
Should we place `options.ignoreMissingFiles` to a `val` as we do in
`FileScanRDD`?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -190,12 +191,24 @@ object MultiLineCSVDataSource extends CSVDataSource {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
- val path = new Path(lines.getPath())
- UnivocityParser.tokenizeStream(
-
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
- shouldDropHeader = false,
- new CsvParser(parsedOptions.asParserSettings),
- encoding = parsedOptions.charset)
+ try {
+ val path = new Path(lines.getPath())
+ UnivocityParser.tokenizeStream(
+
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
+ shouldDropHeader = false,
+ new CsvParser(parsedOptions.asParserSettings),
+ encoding = parsedOptions.charset)
+ } catch {
+ case e: FileNotFoundException if parsedOptions.ignoreMissingFiles =>
Review Comment:
The same as in JSON datasource, can you put the option to a `val`?
https://github.com/apache/spark/blob/7796d8a63318d560b08d4d6a8b4d68ea0112bd3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L88-L89
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala:
##########
@@ -99,6 +100,14 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
wrappedCharException)
+ case e: FileNotFoundException if options.ignoreMissingFiles =>
+ logWarning(s"Skipped missing file", e)
+ Some(StructType(Nil))
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !options.ignoreMissingFiles =>
throw e
+ case e: IOException if options.ignoreCorruptFiles =>
Review Comment:
How about `RuntimeException` like at
https://github.com/apache/spark/blob/7796d8a63318d560b08d4d6a8b4d68ea0112bd3e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala#L262
?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala:
##########
@@ -190,12 +191,24 @@ object MultiLineCSVDataSource extends CSVDataSource {
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
- val path = new Path(lines.getPath())
- UnivocityParser.tokenizeStream(
-
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
- shouldDropHeader = false,
- new CsvParser(parsedOptions.asParserSettings),
- encoding = parsedOptions.charset)
+ try {
+ val path = new Path(lines.getPath())
+ UnivocityParser.tokenizeStream(
+
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
+ shouldDropHeader = false,
+ new CsvParser(parsedOptions.asParserSettings),
+ encoding = parsedOptions.charset)
+ } catch {
+ case e: FileNotFoundException if parsedOptions.ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${lines.getPath()}", e)
+ Array.empty[Array[String]]
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
Review Comment:
Please, remove the comment.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala:
##########
@@ -1913,6 +1913,22 @@ abstract class JsonSuite
}
}
+ test("SPARK-45035: json enable ignoreCorruptFiles with multi-Line") {
Review Comment:
Could you add a test for ignoreMissingFiles, please.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala:
##########
@@ -1448,37 +1447,25 @@ abstract class CSVSuite
}
test("Enabling/disabling ignoreCorruptFiles") {
Review Comment:
Could you add a test for `ignoreMissingFiles`, please.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]