sandip-db commented on code in PR #44163: URL: https://github.com/apache/spark/pull/44163#discussion_r1434278718
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-46248: Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + assert( + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + .isEmpty + ) + } + }) + withTempPath { dir => + import org.apache.hadoop.fs.Path + val xmlPath = new Path(dir.getCanonicalPath, "xml") + val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf()) + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[SparkFileNotFoundException]) + assert(e.getCause.getMessage.contains(".xml does not exist")) + } + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df2 = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + assert(df2.collect().isEmpty) + } + } + } + + test("SPARK-46248: Read from a corrupted compressed file") { + withTempDir { dir => + val format = "xml" + val numRecords = 10000 + // create data + val data = + spark.sparkContext.parallelize( + (0 until numRecords).map(i => Row(i.toString, (i * 2).toString))) + val schema = buildSchema(field("a1"), field("a2")) + val df = spark.createDataFrame(data, schema) + + df.coalesce(4) + .write + .mode(SaveMode.Overwrite) + .format(format) + .option("compression", "gZiP") + .option("rowTag", "row") + .save(dir.getCanonicalPath) + + withCorruptedFile(dir) { corruptedDir => + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + val dfCorrupted = spark.read + .format(format) + .option("multiline", "true") + .option("compression", "gzip") + .option("rowTag", "row") + .load(corruptedDir.getCanonicalPath) + assert(dfCorrupted.collect().length > 100) Review Comment: ```suggestion val results = dfCorrupted.collect() assert(results(1) === Row(1, 2)) assert(results.length > 100) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-46248: Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + assert( + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + .isEmpty + ) + } + }) + withTempPath { dir => + import org.apache.hadoop.fs.Path + val xmlPath = new Path(dir.getCanonicalPath, "xml") + val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf()) + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") { + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[SparkFileNotFoundException]) + assert(e.getCause.getMessage.contains(".xml does not exist")) + } + + sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString) + val df2 = spark.read.option("rowTag", "ROW").option("multiLine", true).xml(xmlPath.toString) + fs.delete(xmlPath, true) + withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") { + assert(df2.collect().isEmpty) + } + } + } + + test("SPARK-46248: Read from a corrupted compressed file") { + withTempDir { dir => + val format = "xml" + val numRecords = 10000 + // create data + val data = + spark.sparkContext.parallelize( + (0 until numRecords).map(i => Row(i.toString, (i * 2).toString))) + val schema = buildSchema(field("a1"), field("a2")) + val df = spark.createDataFrame(data, schema) + + df.coalesce(4) + .write + .mode(SaveMode.Overwrite) + .format(format) + .option("compression", "gZiP") + .option("rowTag", "row") + .save(dir.getCanonicalPath) + + withCorruptedFile(dir) { corruptedDir => + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + val dfCorrupted = spark.read + .format(format) + .option("multiline", "true") + .option("compression", "gzip") + .option("rowTag", "row") + .load(corruptedDir.getCanonicalPath) + assert(dfCorrupted.collect().length > 100) + val dfCorruptedWSchema = spark.read + .format(format) + .schema(schema) + .option("multiline", "true") + .option("compression", "gzip") + .option("rowTag", "row") + .load(corruptedDir.getCanonicalPath) + dfCorrupted.equals(dfCorruptedWSchema) Review Comment: ```suggestion assert(dfCorrupted.dtypes === dfCorruptedWSchema.dtypes) checkAnswer(dfCorrupted, dfCorruptedWSchema) ``` ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-46248: Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") Review Comment: This is a repeat of the previous spark.read. Why? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala: ########## @@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-46248: Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") { + withCorruptFile(inputFile => { + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + val e2 = intercept[SparkException] { + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + } + assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) + assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + spark.read + .option("rowTag", "ROW") + .option("multiLine", false) + .xml(inputFile.toURI.toString) + .collect() + assert( + spark.read + .option("rowTag", "ROW") + .option("multiLine", true) + .xml(inputFile.toURI.toString) + .collect() + .isEmpty + ) Review Comment: ```suggestion val result = spark.read .option("rowTag", "ROW") .option("multiLine", false) .xml(inputFile.toURI.toString) .collect() assert(result.isEmpty) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org