Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/20937#discussion_r179952617
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
---
@@ -2162,4 +2162,262 @@ class JsonSuite extends QueryTest with
SharedSQLContext with TestJsonData {
assert(ds.schema == new StructType().add("f1", LongType))
}
+
+ def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+ }
+
+ test("SPARK-23723: json in UTF-16 with BOM") {
+ val fileName = "json-tests/utf16WithBOM.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .option("encoding", "UTF-16")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood")))
+ }
+
+ test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+ val fileName = "json-tests/utf32BEWithBOM.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Use user's encoding in reading of multi-line json in
UTF-16LE") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("encoding" -> "UTF-16LE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+ test("SPARK-23723: Unsupported encoding name") {
+ val invalidCharset = "UTF-128"
+ val exception = intercept[java.io.UnsupportedEncodingException] {
+ spark.read
+ .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+ .json(testFile("json-tests/utf16LE.json"))
+ .count()
+ }
+
+ assert(exception.getMessage.contains(invalidCharset))
+ }
+
+ test("SPARK-23723: checking that the encoding option is case agnostic") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val jsonDF = spark.read.schema(schema)
+ .option("multiline", "true")
+ .options(Map("encoding" -> "uTf-16lE"))
+ .json(testFile(fileName))
+
+ checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+ }
+
+
+ test("SPARK-23723: specified encoding is not matched to actual
encoding") {
+ val fileName = "json-tests/utf16LE.json"
+ val schema = new StructType().add("firstName",
StringType).add("lastName", StringType)
+ val exception = intercept[SparkException] {
+ spark.read.schema(schema)
+ .option("mode", "FAILFAST")
+ .option("multiline", "true")
+ .options(Map("encoding" -> "UTF-16BE"))
+ .json(testFile(fileName))
+ .count()
+ }
+ val errMsg = exception.getMessage
+
+ assert(errMsg.contains("Malformed records are detected in record
parsing"))
+ }
+
+ def checkEncoding(
+ expectedEncoding: String,
+ pathToJsonFiles: String,
+ expectedContent: String): Unit = {
+ val jsonFiles = new File(pathToJsonFiles)
+ .listFiles()
+ .filter(_.isFile)
+ .filter(_.getName.endsWith("json"))
+ val actualContent = jsonFiles.map { file =>
+ new String(Files.readAllBytes(file.toPath), expectedEncoding)
+ }.mkString.trim.replaceAll(" ", "")
+
+ assert(actualContent == expectedContent)
+ }
+
+ test("SPARK-23723: save json in UTF-32BE") {
+ val encoding = "UTF-32BE"
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+
+ checkEncoding(
+ expectedEncoding = encoding,
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}""")
+ }
+ }
+
+ test("SPARK-23723: save json in default encoding - UTF-8") {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+
+ checkEncoding(
+ expectedEncoding = "UTF-8",
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}""")
+ }
+ }
+
+ test("SPARK-23723: wrong output encoding") {
+ val encoding = "UTF-128"
+ val exception = intercept[java.io.UnsupportedEncodingException] {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq((0)))
+ df.write
+ .options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+ }
+ }
+
+ assert(exception.getMessage == encoding)
+ }
+
+ test("SPARK-23723: read written json in UTF-16LE") {
+ val options = Map("encoding" -> "UTF-16LE", "lineSep" -> "\n")
+ withTempPath { path =>
+ val ds = spark.createDataset(Seq(
+ ("a", 1), ("b", 2), ("c", 3))
+ ).repartition(2)
+ ds.write
+ .options(options)
+ .format("json").mode("overwrite")
+ .save(path.getCanonicalPath)
+ val readBack = spark
+ .read
+ .options(options)
+ .json(path.getCanonicalPath)
+
+ checkAnswer(readBack.toDF(), ds.toDF())
+ }
+ }
+
+ def checkReadJson(lineSep: String, encoding: String, inferSchema:
Boolean, id: Int): Unit = {
+ test(s"SPARK-23724: checks reading json in ${encoding} #${id}") {
+ val lineSepInBytes = {
+ if (lineSep.startsWith("x")) {
+ lineSep.replaceAll("[^0-9A-Fa-f]", "")
+ .sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
+ } else {
+ lineSep.getBytes(encoding)
+ }
+ }
+ val schema = new StructType().add("f1", StringType).add("f2",
IntegerType)
+ withTempPath { path =>
+ val records = List(("a", 1), ("b", 2))
+ val data = records
+ .map(rec => s"""{"f1":"${rec._1}",
"f2":${rec._2}}""".getBytes(encoding))
+ .reduce((a1, a2) => a1 ++ lineSepInBytes ++ a2)
+ val os = new FileOutputStream(path)
+ os.write(data)
+ os.close()
+ val reader = if (inferSchema) {
+ spark.read
+ } else {
+ spark.read.schema(schema)
+ }
+ val readBack = reader
+ .option("encoding", encoding)
+ .option("lineSep", lineSep)
+ .json(path.getCanonicalPath)
+ checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
+ }
+ }
+ }
+
+ // scalastyle:off nonascii
+ List(
+ (0, "|", "UTF-8", false),
+ (1, "^", "UTF-16BE", true),
+ (2, "::", "ISO-8859-1", true),
+ (3, "!!!@3", "UTF-32LE", false),
+ (4, 0x1E.toChar.toString, "UTF-8", true),
+ (5, "ì", "UTF-32BE", false),
+ (6, "кÑкÑ", "CP1251", true),
+ (7, "sep", "utf-8", false),
+ (8, "\r\n", "UTF-16LE", false),
+ (9, "\r\n", "utf-16be", true),
+ (10, "\u000d\u000a", "UTF-32BE", false),
+ (11, "\u000a\u000d", "UTF-8", true),
+ (12, "===", "US-ASCII", false),
+ (13, "$^+", "utf-32le", true)
+ ).foreach{case (i, d, c, s) => checkReadJson(d, c, s, i)}
+ // scalastyle:on nonascii
+
+ test("SPARK-23724: lineSep should be set if encoding if different from
UTF-8") {
+ val encoding = "UTF-16LE"
+ val exception = intercept[IllegalArgumentException] {
+ spark.read
+ .options(Map("encoding" -> encoding))
+ .json(testFile("json-tests/utf16LE.json"))
+ .count()
+ }
+
+ assert(exception.getMessage.contains(
+ s"""The lineSep option must be specified for the $encoding
encoding"""))
+ }
+
+ private val badJson = "\u0000\u0000\u0000A\u0001AAA"
+
+ test("SPARK-23094: invalid json with leading nulls - from file
(multiLine=true)") {
+ import testImplicits._
--- End diff --
Seems we can remove this implicit import.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]