Github user MaxGekk commented on a diff in the pull request:
https://github.com/apache/spark/pull/20937#discussion_r179737478
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with
SharedSQLContext with TestJsonData {
}
}
- def testLineSeparator(lineSep: String): Unit = {
- test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
- // Read
- val data =
- s"""
- | {"f":
- |"a", "f0": 1}$lineSep{"f":
- |
- |"c", "f0": 2}$lineSep{"f": "d", "f0": 3}
- """.stripMargin
- val dataWithTrailingLineSep = s"$data$lineSep"
-
- Seq(data, dataWithTrailingLineSep).foreach { lines =>
- withTempPath { path =>
- Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
- val df = spark.read.option("lineSep",
lineSep).json(path.getAbsolutePath)
- val expectedSchema =
- StructType(StructField("f", StringType) :: StructField("f0",
LongType) :: Nil)
- checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
- assert(df.schema === expectedSchema)
+ def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+ }
+
+ test("SPARK-23723: json in UTF-16 with BOM") {
+ val fileName = "json-tests/utf16WithBOM.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ // This option will be replaced by .option("lineSep", "x00 0a")
+ // as soon as lineSep allows to specify sequence of bytes in
hexadecimal format.
+ .option("mode", "DROPMALFORMED")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(
+ Row("Chris", "Baird"), Row("Doug", "Rood")
+ ))
+ }
+
+ test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+ val fileName = "json-tests/utf32BEWithBOM.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Use user's encoding in reading of multi-line json in
UTF-16LE") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("encoding" -> "UTF-16LE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Unsupported charset name") {
+ val invalidCharset = "UTF-128"
+ val exception = intercept[java.io.UnsupportedEncodingException] {
+ spark.read
+ .options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+ .json(testFile("json-tests/utf16LE.json"))
+ .count()
+ }
+
+ assert(exception.getMessage.contains(invalidCharset))
+ }
+
+ test("SPARK-23723: checking that the charset option is case agnostic") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("charset" -> "uTf-16lE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+
+ test("SPARK-23723: specified charset is not matched to actual charset") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val exception = intercept[SparkException] {
+ spark.read.schema(schema)
+ .option("mode", "FAILFAST")
+ .option("multiline", "true")
+ .options(Map("charset" -> "UTF-16BE"))
+ .json(testFile(fileName))
+ .count()
+ }
+ val errMsg = exception.getMessage
+
+ assert(errMsg.contains("Malformed records are detected in record
parsing"))
+ }
+
+ def checkCharset(
+ expectedCharset: String,
+ pathToJsonFiles: String,
+ expectedContent: String
+ ): Unit = {
+ val jsonFiles = new File(pathToJsonFiles)
+ .listFiles()
+ .filter(_.isFile)
+ .filter(_.getName.endsWith("json"))
+ val jsonContent = jsonFiles.map { file =>
+ scala.io.Source.fromFile(file, expectedCharset).mkString
+ }
+ val cleanedContent = jsonContent
+ .mkString
+ .trim
+ .replaceAll(" ", "")
+
+ assert(cleanedContent == expectedContent)
+ }
+
+ test("SPARK-23723: save json in UTF-32BE") {
+ val encoding = "UTF-32BE"
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .options(Map("charset" -> encoding, "lineSep" -> "\n"))
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+
+ checkCharset(
+ expectedCharset = encoding,
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}"""
+ )
+ }
+ }
+
+ test("SPARK-23723: save json in default charset - UTF-8") {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+
+ checkCharset(
+ expectedCharset = "UTF-8",
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}"""
+ )
+ }
+ }
+
+ test("SPARK-23723: wrong output charset") {
+ val charset = "UTF-128"
+ val exception = intercept[java.io.UnsupportedEncodingException] {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq((0)))
+ df.write
+ .options(Map("charset" -> charset, "lineSep" -> "\n"))
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+ }
+ }
+
+ assert(exception.getMessage == charset)
+ }
+
+ test("SPARK-23723: read written json in UTF-16") {
+ val charset = "UTF-16"
+ case class Rec(f1: String, f2: Int)
+ withTempPath { path =>
+ val ds = spark.createDataset(Seq(
+ ("a", 1), ("b", 2), ("c", 3))
+ ).repartition(2)
+ ds.write
+ .options(Map("charset" -> charset, "lineSep" -> "\n"))
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+ val savedDf = spark
+ .read
+ .schema(ds.schema)
+ // This option will be replaced by .option("lineSep", "x00 0a")
+ // as soon as lineSep allows to specify sequence of bytes in
hexadecimal format.
+ .option("mode", "DROPMALFORMED")
+ .json(path.getCanonicalPath)
+
+ checkAnswer(savedDf.toDF(), ds.toDF())
+ }
+ }
+
+ def checkReadJson(
+ lineSep: String,
+ charsetOption: String,
+ charset: String,
+ inferSchema: Boolean,
+ runId: Int
+ ): Unit = {
+ test(s"SPARK-23724: checks reading json in ${charset} #${runId}") {
+ val delimInBytes = {
+ if (lineSep.startsWith("x")) {
+ lineSep.replaceAll("[^0-9A-Fa-f]", "")
+ .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
+ } else {
+ lineSep.getBytes(charset)
}
}
+ case class Rec(f1: String, f2: Int) {
+ def json = s"""{"f1":"${f1}", "f2":$f2}"""
+ def bytes = json.getBytes(charset)
+ def row = Row(f1, f2)
+ }
+ val schema = new StructType().add("f1", StringType).add("f2",
IntegerType)
+ withTempPath { path =>
+ val records = List(Rec("a", 1), Rec("b", 2))
+ val data = records.map(_.bytes).reduce((a1, a2) => a1 ++
delimInBytes ++ a2)
+ val os = new FileOutputStream(path)
+ os.write(data)
+ os.close()
+ val reader = if (inferSchema) {
+ spark.read
+ } else {
+ spark.read.schema(schema)
+ }
+ val savedDf = reader
+ .option(charsetOption, charset)
+ .option("lineSep", lineSep)
+ .json(path.getCanonicalPath)
+ checkAnswer(savedDf, records.map(_.row))
+ }
+ }
+ }
+ // scalastyle:off nonascii
+ List(
+ ("|", "encoding", "UTF-8", false),
+ ("^", "charset", "UTF-16BE", true),
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]