[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178496032 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is + set, the encoding of input json will be detected automatically. --- End diff -- Yup, I admit it. In that case, we could optionally describe it a bit more details with the multiline mode too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178495813 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is + set, the encoding of input json will be detected automatically. --- End diff -- I would agree to you about the per-line mode, but not about the multiline mode. For example, the test checks that: https://github.com/MaxGekk/spark-1/blob/53834005ba22c0c3e7be883949ab01a5bf1a0b9a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L2148-L2156 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478826 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478996 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString --- End diff -- Can we use `new String(Files.readAllBytes(partFile.toPath), StandardCharsets.UTF_8)` just for consistency above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478728 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178475553 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is --- End diff -- just for sure is this intended? I thought `standard charset` -> `encoding (charset) ` to be consistent with write path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476439 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Charset was detected automatically. +|You might want to set it explicitly via the charset option like: +| .option("charset", "UTF-8") --- End diff -- `charset` -> `encoding`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178479007 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent --- End diff -- We could make it inlined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476163 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) +jsonFactory.createParser(new InputStreamReader(bain, enc)) + case _ => +jsonFactory.createParser(record.getBytes, 0, record.getLength) +} } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def inputStream( --- End diff -- ditto for avoiding type dispatch on `encoding`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476229 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) +jsonFactory.createParser(new InputStreamReader(bain, enc)) + case _ => +jsonFactory.createParser(record.getBytes, 0, record.getLength) +} } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def inputStream( + jsonFactory: JsonFactory, + is: InputStream, + encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +jsonFactory.createParser(new InputStreamReader(is, enc)) + case _ => +jsonFactory.createParser(is) +} + } + + def internalRow( +jsonFactory: JsonFactory, --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178475983 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { --- End diff -- `encoding: Option[String] = None` -> `encoding: Option[String]`. You don't have to worry about signature for private ones. Theoretically MiMa will detect the signature changes for public APIs, which make the Jenkins tests failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476459 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Charset was detected automatically. +|You might want to set it explicitly via the charset option like: +| .option("charset", "UTF-8") +|Example of supported charsets: +| UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE --- End diff -- Let's take out blacklisted ones. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178475654 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is + set, the encoding of input json will be detected automatically. --- End diff -- I think we shouldn't say it's automatically detected yet. I still think it accidentally works. Newline's encoding is not automatically detected and the encoding is detected for each line not the whole file if I understood correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478788 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478498 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -41,19 +41,23 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => -require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") -sep + val encoding: Option[String] = parameters.get(ENCODING) + + val lineSeparator: Option[Array[Byte]] = parameters.get(LINE_SEPARATOR).map { lineSep => --- End diff -- Can we have the same signature with JSON one too? `String`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178479126 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString --- End diff -- Just a question, is it hard to do this with `Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))`? I thought it makes other tests easily added in the future. If it needs many changes, I am fine as is too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478647 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { --- End diff -- ditto for moving it up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178477441 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -153,7 +158,12 @@ object MultiLineJsonDataSource extends JsonDataSource { parsedOptions: JSONOptions): StructType = { val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) -JsonInferSchema.infer(sampled, parsedOptions, createParser) + +JsonInferSchema.infer[PortableDataStream]( + sampled, + parsedOptions, + createParser(_, _, parsedOptions.encoding) +) --- End diff -- Let's move it up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478745 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478627 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) --- End diff -- inlined --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { val path = new Path(record.getPath()) CreateJacksonParser.inputStream( jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset +) } override def readFile( conf: Configuration, file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { +def createInputStream() = { --- End diff -- I admit it but nested function is a thing to avoid too unless if it really cleans up the codes or is required. Let's just leave it as was if both don't look quite cool .. :). At least that's usually what I do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478363 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala --- @@ -151,7 +153,13 @@ private[json] class JsonOutputWriter( context: TaskAttemptContext) extends OutputWriter with Logging { - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + private val encoding = options.encoding match { +case Some(charsetName) => Charset.forName(charsetName) +case _ => StandardCharsets.UTF_8 --- End diff -- `_` -> `None` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178475577 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is + set, the encoding of input json will be detected automatically. --- End diff -- `json` -> `JSON` to be consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178475690 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) +jsonFactory.createParser(new InputStreamReader(bain, enc)) + case _ => --- End diff -- `_` => `None` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478336 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { val path = new Path(record.getPath()) CreateJacksonParser.inputStream( jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset +) } override def readFile( conf: Configuration, file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { +def createInputStream() = { + CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) +} def partitionedFileString(ignored: Any): UTF8String = { - Utils.tryWithResource { -CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) - } { inputStream => -UTF8String.fromBytes(ByteStreams.toByteArray(inputStream)) + Utils.tryWithResource(createInputStream()) { is => +UTF8String.fromBytes(ByteStreams.toByteArray(is)) } } +val charset = parser.options.encoding val safeParser = new FailureSafeParser[InputStream]( - input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString), + input => parser.parse[InputStream]( +input, +CreateJacksonParser.inputStream(_, _, charset), +partitionedFileString + ), --- End diff -- ditto for moving up the last line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178477325 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -107,4 +126,9 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } + + def getTextOptions: Map[String, String] = { --- End diff -- `Map[String, String]() ` -> `Map.empty[String, String]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178477311 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -107,4 +126,9 @@ private[sql] class JSONOptions( allowBackslashEscapingAnyCharacter) factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars) } + + def getTextOptions: Map[String, String] = { --- End diff -- This is currently used only once. Can we just put it where it's used? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476391 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A sequence of bytes between two consecutive json records. --- End diff -- `A sequence of bytes` -> `A string` and `json` -> `JSON`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178478519 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2070,9 +2070,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { // Read val data = s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | + | {"f": + |"a", "f0": 1}$lineSep{"f": + | --- End diff -- Seems a mistake ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) +jsonFactory.createParser(new InputStreamReader(bain, enc)) + case _ => +jsonFactory.createParser(record.getBytes, 0, record.getLength) +} } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def inputStream( + jsonFactory: JsonFactory, + is: InputStream, + encoding: Option[String] = None): JsonParser = { +encoding match { + case Some(enc) => +jsonFactory.createParser(new InputStreamReader(is, enc)) + case _ => +jsonFactory.createParser(is) +} + } + + def internalRow( +jsonFactory: JsonFactory, +row: InternalRow, +field: Int, +encoding: Option[String] = None + ): JsonParser = { --- End diff -- Let's move this line up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476075 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { --- End diff -- Looks we create a partial function and then use it and therefore it's going to do the type dispatch for every record. Can we avoid this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178476931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,32 +93,34 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd + +JsonInferSchema.infer[InternalRow]( + rdd, + parsedOptions, + CreateJacksonParser.internalRow(_, _, 0, parsedOptions.encoding) +) } private def createBaseDataset( sparkSession: SparkSession, inputPaths: Seq[FileStatus], - lineSeparator: Option[String]): Dataset[String] = { -val textOptions = lineSeparator.map { lineSep => - Map(TextOptions.LINE_SEPARATOR -> lineSep) -}.getOrElse(Map.empty[String, String]) - + parsedOptions: JSONOptions + ): Dataset[String] = { --- End diff -- Let's move this line up too. I think it should be either ``` ... parsedOptions: JSONOptions) : Dataset[String] = { ``` or ``` parsedOptions: JSONOptions) : Dataset[String] = { ``` https://github.com/databricks/scala-style-guide#spacing-and-indentation but somehow I believe we usually do the latter style. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178477458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { val path = new Path(record.getPath()) CreateJacksonParser.inputStream( jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset +) --- End diff -- ditto for moving it up the last parenthesis. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178434446 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { val path = new Path(record.getPath()) CreateJacksonParser.inputStream( jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset +) } override def readFile( conf: Configuration, file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { +def createInputStream() = { --- End diff -- I don't know but for me it is easier to read if I see the same name instead of comparing 2 expressions and recognizing them as the same. I believe the below is more complicated: ``` Utils.tryWithResource(CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath { is => UTF8String.fromBytes(ByteStreams.toByteArray(is)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178430652 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) --- End diff -- yup that'd work too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178428640 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) --- End diff -- I would blacklist 2 encodings for now - UTF-16 and UTF-32 but allow UTF-8, UTF-16LE/BE, UTF-32LE/BE and others like ISO 8859-1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala --- @@ -151,7 +153,16 @@ private[json] class JsonOutputWriter( context: TaskAttemptContext) extends OutputWriter with Logging { - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) + private val charset = options.charset match { +case Some(charsetName) => Charset.forName(charsetName) +case _ => StandardCharsets.UTF_8 + } + + private val writer = CodecStreams.createOutputStreamWriter( +context, +new Path(path), +charset + ) --- End diff -- Seems it's going to fit in two lines. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427418 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,30 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A sequence of bytes between two consecutive json records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. + */ + val charset: Option[String] = parameters.get("charset") +.orElse(parameters.get("encoding")).map { cs => + if (multiLine == false && cs != "UTF-8" && lineSeparator.isEmpty) { --- End diff -- Shall we do this with `require`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427349 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + charset: Option[String] = None): JsonParser = { val path = new Path(record.getPath()) CreateJacksonParser.inputStream( jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path), + charset +) } override def readFile( conf: Configuration, file: PartitionedFile, parser: JacksonParser, schema: StructType): Iterator[InternalRow] = { +def createInputStream() = { --- End diff -- To be honest, I think I wouldn't have a nested function to deduplicate two lines .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427279 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") --- End diff -- No, we don't know this yet. Let's remove this comment and the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427175 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -41,13 +41,16 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => -require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") -sep + val charset: Option[String] = Some("UTF-8") + + val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").map { lineSep => +require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") +lineSep.getBytes(charset.getOrElse( --- End diff -- Shall we just `require` here too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427113 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) --- End diff -- You don't have to document 100% list. just best effort thing. We can fix and add more encodings later if anyone found later, if it's hard to find out every encoding supported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427139 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,30 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A sequence of bytes between two consecutive json records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. + */ + val charset: Option[String] = parameters.get("charset") +.orElse(parameters.get("encoding")).map { cs => + if (multiLine == false && cs != "UTF-8" && lineSeparator.isEmpty) { +throw new IllegalArgumentException( + s"""Please, set the 'lineSep' option for the given charset $cs. + |Example: .option("lineSep", "|^|") + |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin +) --- End diff -- Let's move this parenthesis move up and inlined if you are also fine with that (just because that's what I have seen more frequently). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, charset: Option[String] = None): JsonParser = { +charset match { --- End diff -- `:%s/charset/encoding` .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178427011 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.So
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178426994 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) --- End diff -- If that's difficult (or weird or hacky), let's just make whitelist and document them explicitly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178426903 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { --- End diff -- You don't have to completely remove this out .. just take out the ones not working. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178426879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,30 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A sequence of bytes between two consecutive json records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard charset name. For example UTF-8, UTF-16 and UTF-32. + * If charset is not specified (None), it will be detected automatically. + */ + val charset: Option[String] = parameters.get("charset") --- End diff -- I thought we talked `encoding` would have higher priority like CSV. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r178426866 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) --- End diff -- Can we have a test case with `UTF-16`? I assume the delimiter itself has the BOM and won't work correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/20937 [SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files ## What changes were proposed in this pull request? I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option: ``` spark.read.schema(schema) .option("multiline", "true") .option("encoding", "UTF-16LE") .json(fileName) ``` If the option is not specified, charset auto-detection mechanism is used by default. The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in UTF-8 charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like `.option("charset", "UTF-16")`. By default the output charset is still UTF-8 to keep backward compatibility. The solution has the following restrictions for per-line mode (`multiline = false`): - If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725 - Json files started from [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) cannot be read properly. A possible solution is a flexible format for `lineSep` which allows to specify line separator as sequence of bytes independently from encoding. A pull request for that will be prepared soon. ## How was this patch tested? I added the following tests: - reads an json file in UTF-16 charset with BOM - read json file by using charset auto detection (UTF-32BE with BOM) - read json file using of user's charset (UTF-16LE) - saving in UTF-32BE and read the result by standard library (not by Spark) - checking that default charset is UTF-8 - handling wrong (unsupported) charset You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 json-encoding-line-sep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20937 commit b2e92b4706c5ed3b141805933f29beb87e1b7371 Author: Maxim Gekk Date: 2018-02-11T20:06:53Z Test for reading json in UTF-16 with BOM commit cb2f27ba73cb5838e2910c31ca204100bb4eebca Author: Maxim Gekk Date: 2018-02-11T20:48:35Z Use user's charset or autodetect it if the charset is not specified commit 0d45fd382bb90ebd7161d57a3da23820b4497f67 Author: Maxim Gekk Date: 2018-02-13T09:48:08Z Added a type and a comment for charset commit 1fb9b321a4fac0f41cfb9dd5f85b61feb6796227 Author: Maxim Gekk Date: 2018-02-13T10:00:27Z Replacing the monadic chaining by matching because it is more readable commit c3b04ee68338ad4f93a5361a41db28b37f020907 Author: Maxim Gekk Date: 2018-02-13T10:44:19Z Keeping the old method for backward compatibility commit 93d38794dd261ee1bbe2497470ee43de1186ef3c Author: Maxim Gekk Date: 2018-02-13T10:54:52Z testFile is moved into the test to make more local because it is used only in the test commit 15798a1ce61df29e9a32f960e755495e3d63f4e3 Author: Maxim Gekk Date: 2018-02-13T11:15:25Z Adding the charset as third parameter to the text method commit cc05ce9af7c9f1d14bd10c1f46a60ce043c13fe1 Author: Maxim Gekk Date: 2018-02-13T11:29:57Z Removing whitespaces at the end of the line commit 74f2026e62389902ab7a4c418aa96a492fa14f6f Author: Maxim Gekk Date: 2018-02-13T12:29:28Z Fix the comment in javadoc style commit 4856b8e0b287b3ba3331865298f0603dde18459c Author: Maxim Gekk Date: 2018-02-13T12:32:48Z Simplifying of the UTF-16 test commit 084f41fb6edd7c86aeb8643973119cb4b38a34fa Author: Maxim Gekk Date: 2018-02-15T17:33:25Z A hint to the exception how to set the charset explicitly commit 31cd793a86e6a0e48e0150ffb8c36da2872c65ca Author: Maxim Gekk Date: 2018-02-15T18:00:55Z Fix for scala style checks commit 6eacd186a954a3f724ee607826b17f432ead77e1 Author: Maxim Gekk Date: 2018-02-15T18:44:04Z Run tests again commit 3b4a509d0260cfab720a5471ccd937de55c56093 Author: Maxim Gekk Date: 2018-02-15T19:06:06Z Improving of the exception message commit cd1124ef7e6329f4dcd6926064271cd24b5a150d Author: Maxim Gekk Date: 2018-02-15T19:41:35Z Appended the original message to the exception commit ebf53904151582eef6d95780ca30b773404ae141 Aut