sandip-db commented on code in PR #44163:
URL: https://github.com/apache/spark/pull/44163#discussion_r1434278718


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", false)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        spark.read
+          .option("rowTag", "ROW")
+          .option("multiLine", false)
+          .xml(inputFile.toURI.toString)
+          .collect()
+        assert(
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+            .isEmpty
+        )
+      }
+    })
+    withTempPath { dir =>
+      import org.apache.hadoop.fs.Path
+      val xmlPath = new Path(dir.getCanonicalPath, "xml")
+      val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          df.collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+        assert(e.getCause.getMessage.contains(".xml does not exist"))
+      }
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+        assert(df2.collect().isEmpty)
+      }
+    }
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+    withTempDir { dir =>
+      val format = "xml"
+      val numRecords = 10000
+      // create data
+      val data =
+        spark.sparkContext.parallelize(
+          (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+      val schema = buildSchema(field("a1"), field("a2"))
+      val df = spark.createDataFrame(data, schema)
+
+      df.coalesce(4)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(format)
+        .option("compression", "gZiP")
+        .option("rowTag", "row")
+        .save(dir.getCanonicalPath)
+
+      withCorruptedFile(dir) { corruptedDir =>
+        withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+          val dfCorrupted = spark.read
+            .format(format)
+            .option("multiline", "true")
+            .option("compression", "gzip")
+            .option("rowTag", "row")
+            .load(corruptedDir.getCanonicalPath)
+          assert(dfCorrupted.collect().length > 100)

Review Comment:
   ```suggestion
             val results = dfCorrupted.collect()
             assert(results(1) === Row(1, 2))
             assert(results.length > 100)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", false)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        spark.read
+          .option("rowTag", "ROW")
+          .option("multiLine", false)
+          .xml(inputFile.toURI.toString)
+          .collect()
+        assert(
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+            .isEmpty
+        )
+      }
+    })
+    withTempPath { dir =>
+      import org.apache.hadoop.fs.Path
+      val xmlPath = new Path(dir.getCanonicalPath, "xml")
+      val fs = xmlPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          df.collect()
+        }
+        assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+        assert(e.getCause.getMessage.contains(".xml does not exist"))
+      }
+
+      sampledTestData.write.option("rowTag", "ROW").xml(xmlPath.toString)
+      val df2 = spark.read.option("rowTag", "ROW").option("multiLine", 
true).xml(xmlPath.toString)
+      fs.delete(xmlPath, true)
+      withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+        assert(df2.collect().isEmpty)
+      }
+    }
+  }
+
+  test("SPARK-46248: Read from a corrupted compressed file") {
+    withTempDir { dir =>
+      val format = "xml"
+      val numRecords = 10000
+      // create data
+      val data =
+        spark.sparkContext.parallelize(
+          (0 until numRecords).map(i => Row(i.toString, (i * 2).toString)))
+      val schema = buildSchema(field("a1"), field("a2"))
+      val df = spark.createDataFrame(data, schema)
+
+      df.coalesce(4)
+        .write
+        .mode(SaveMode.Overwrite)
+        .format(format)
+        .option("compression", "gZiP")
+        .option("rowTag", "row")
+        .save(dir.getCanonicalPath)
+
+      withCorruptedFile(dir) { corruptedDir =>
+        withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+          val dfCorrupted = spark.read
+            .format(format)
+            .option("multiline", "true")
+            .option("compression", "gzip")
+            .option("rowTag", "row")
+            .load(corruptedDir.getCanonicalPath)
+          assert(dfCorrupted.collect().length > 100)
+          val dfCorruptedWSchema = spark.read
+            .format(format)
+            .schema(schema)
+            .option("multiline", "true")
+            .option("compression", "gzip")
+            .option("rowTag", "row")
+            .load(corruptedDir.getCanonicalPath)
+          dfCorrupted.equals(dfCorruptedWSchema)

Review Comment:
   ```suggestion
             assert(dfCorrupted.dtypes === dfCorruptedWSchema.dtypes)
             checkAnswer(dfCorrupted, dfCorruptedWSchema)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", false)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")

Review Comment:
   This is a repeat of the previous spark.read. Why?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala:
##########
@@ -2371,4 +2400,108 @@ class XmlSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-46248: Enabling/disabling 
ignoreCorruptFiles/ignoreMissingFiles") {
+    withCorruptFile(inputFile => {
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", false)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end 
of input stream")
+        val e2 = intercept[SparkException] {
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+        }
+        assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException])
+        assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end 
of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        spark.read
+          .option("rowTag", "ROW")
+          .option("multiLine", false)
+          .xml(inputFile.toURI.toString)
+          .collect()
+        assert(
+          spark.read
+            .option("rowTag", "ROW")
+            .option("multiLine", true)
+            .xml(inputFile.toURI.toString)
+            .collect()
+            .isEmpty
+        )

Review Comment:
   ```suggestion
           val result = spark.read
              .option("rowTag", "ROW")
              .option("multiLine", false)
              .xml(inputFile.toURI.toString)
              .collect()
           assert(result.isEmpty)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to