[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20937 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870228 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't. --- End diff -- I usually avoid abbreviation in the doc tho. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { -val path = new Path(record.getPath()) -CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + private def dataToInputStream(dataStream: PortableDataStream): InputStream = { +val path = new Path(dataStream.getPath()) + CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path) + } + + private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = { +CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream)) + } + + private def createParser(enc: String, jsonFactory: JsonFactory, + stream: PortableDataStream): JsonParser = { --- End diff -- ditto for style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870219 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2171,241 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "test-data/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "test-data/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("test-data/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, + expectedContent: String): Unit = { --- End diff -- I think it should be ``` def checkEncoding( expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = { ``` per https://github.com/databricks/scala-style-guide#spacing-and-indentation or ``` def checkEncoding( expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = { ``` if it fits per https://github.com/databricks/scala-style-guide/issues/58#issue-243844040 Not a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870196 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2171,241 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "test-data/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "test-data/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("test-data/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.json(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write.json(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = "UTF-8", +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: wrong output encoding") { +val encoding = "UTF-128" +v
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184863165 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { +val bais = new ByteArrayInputStream(in, 0, length) +val byteChannel = Channels.newChannel(bais) +val decodingBufferSize = Math.min(length, 8192) +val decoder = Charset.forName(enc).newDecoder() + +StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize) + } + + def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = { +val sd = getStreamDecoder(enc, record.getBytes, record.getLength) +jsonFactory.createParser(sd) + } + + def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(is) + } + + def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(new InputStreamReader(is, enc)) --- End diff -- I added a comment above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184863159 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic --- End diff -- I updated python's comment to make it the same as here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184861410 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851362 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851747 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851331 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") --- End diff -- Hm, why should we replace spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851348 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") --- End diff -- I think `.mode("overwrite")` is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851361 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") --- End diff -- ditto ---
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851438 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { --- End diff -- nit: private? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851413 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850820 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,43 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** + * A string between two consecutive JSON records. + */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") sep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically + * when the multiLine option is set to `true`. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + // The following encodings are not supported in per-line mode (multiline is false) + // because they cause some problems in reading files with BOM which is supposed to + // present in the files with such encodings. After splitting input files by lines, + // only the first lines will have the BOM which leads to impossibility for reading + // the rest lines. Besides of that, the lineSep option must have the BOM in such + // encodings which can never present between lines. + val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) + val isBlacklisted = blacklist.contains(Charset.forName(enc)) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && --- End diff -- `forcingLineSep` -> things like ... `isLineSepRequired`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850865 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic --- End diff -- Not a big deal but shall we match the description to Python side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851795 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850683 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { +val bais = new ByteArrayInputStream(in, 0, length) +val byteChannel = Channels.newChannel(bais) +val decodingBufferSize = Math.min(length, 8192) +val decoder = Charset.forName(enc).newDecoder() + +StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize) + } + + def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = { +val sd = getStreamDecoder(enc, record.getBytes, record.getLength) +jsonFactory.createParser(sd) + } + + def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(is) + } + + def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(new InputStreamReader(is, enc)) --- End diff -- I think https://github.com/apache/spark/pull/20937#issuecomment-381406357 is a good investigation. It should be good to leave a small note that we should avoid this way if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851656 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851642 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851188 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("No encoding", 3) { _ => +spark.read.json(path.getAbsolutePath) + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .json(path.getAbsolutePath) + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON schema inferring: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 38902 / 39282 2.6 389.0 1.0X + UTF-8 is set56959 / 57261 1.8 569.6 0.7X + */ + benchmark.run() +} + } + + def perlineParsing(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON per-line parsing", rowsNum) + +withTempPath { path => + // scalastyle:off --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851231 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" --- End diff -- Shall we put the files in `test-data`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851652 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851099 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. --- End diff -- I think we should mention the purpose of this is to check when encoding is set or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851211 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } --- End diff -- Let's put this up like `CSVSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851606 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851052 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off --- End diff -- ``` // scalastyle:off println ... // scalastyle:on println ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183524310 --- Diff: python/pyspark/sql/readwriter.py --- @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``-MM-dd'T'HH:mm:ss.SSSXXX``. +:param encoding: specifies encoding (charset) of saved json files. If None is set, +the default UTF-8 charset will be used. --- End diff -- Sorry, I didn't realized initially that the comment related to writing. For writing if `lineSep` is not set by user, it will be set to `\n` in any case: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L124 Actually, the current implementation is more strict than it is needed. It requires to set `lineSep` explicitly in write if `multiLine` is `false` and `encoding` is different from `UTF-8`. I will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183512588 --- Diff: python/pyspark/sql/readwriter.py --- @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``-MM-dd'T'HH:mm:ss.SSSXXX``. +:param encoding: specifies encoding (charset) of saved json files. If None is set, +the default UTF-8 charset will be used. --- End diff -- Yes, we can mention this in the comment but the user will get the error: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L116-L117 if the `lineSep` is not set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183511452 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. + If None is set, the encoding of input JSON will be detected automatically + when the multiLine option is set to ``true``. --- End diff -- No, it doesn't. If it had been true, it would break backward compatibility. In the comment, we just want to highlight that encoding auto-detection (it means **correct** auto-detection in all cases) is officially supported in the multiLine mode only. In per-line mode, the auto-detection mechanism (when `encoding` is not set) can fail in some cases, for example if actual encoding of json file is `UTF-16` with BOM but in some case it works (file's encoding is `UTF-8` and actual line separator `\n` for example). That's why @HyukjinKwon suggested to mention only working case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271569 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,12 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """JSON parser cannot handle a character in its input. +|Specifying encoding as an input option explicitly might help to resolve the issue. +|""".stripMargin + e.getMessage +throw new CharConversionException(msg) --- End diff -- BTW we should also follow the existing rule and wrap the exception with `BadRecordException`. See the code above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,12 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """JSON parser cannot handle a character in its input. +|Specifying encoding as an input option explicitly might help to resolve the issue. +|""".stripMargin + e.getMessage +throw new CharConversionException(msg) --- End diff -- This will lose the original stack trace, we should do `throw BadRecordException(() => recordLiteral(record), () => None, e)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183271101 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,41 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** + * A string between two consecutive JSON records. + */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") sep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically + * when the multiLine option is set to `true`. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + // The following encodings are not supported in per-line mode (multiline is false) + // because they cause some problems in reading files with BOM which is supposed to + // present in the files with such encodings. After splitting input files by lines, + // only the first lines will have the BOM which leads to impossibility for reading + // the rest lines. Besides of that, the lineSep option must have the BOM in such + // encodings which can never present between lines. + val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) + val isBlacklisted = blacklist.contains(Charset.forName(enc)) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) --- End diff -- `enc != "UTF-8"`, we should not compare string directly, but turn them into `Charset` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270773 --- Diff: python/pyspark/sql/readwriter.py --- @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``-MM-dd'T'HH:mm:ss.SSSXXX``. +:param encoding: specifies encoding (charset) of saved json files. If None is set, +the default UTF-8 charset will be used. --- End diff -- shall we mention that, if `encoding` is set, `lineSep` also need to be set when `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183270654 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. + If None is set, the encoding of input JSON will be detected automatically + when the multiLine option is set to ``true``. --- End diff -- Does it mean users have to set the encoding if `multiLine` is false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227330 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("No encoding", 3) { _ => +spark.read.json(path.getAbsolutePath) + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .json(path.getAbsolutePath) + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON schema inferring: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 38902 / 39282 2.6 389.0 1.0X + UTF-8 is set56959 / 57261 1.8 569.6 0.7X + */ + benchmark.run() +} + } + + def perlineParsing(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON per-line parsing", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write.json(path.getAbsolutePath) + val schema = new StructType().add("fieldA", StringType) + + benchmark.addCase("No encoding", 3) { _ => +spark.read + .schema(schema) + .json(path.getAbsolutePath) + .count() + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .schema(schema) + .json(path.getAbsolutePath) + .count() + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON per-line parsing: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 25947 / 26188 3.9 259.5 1.0X + UTF-8 is set46319 / 46417 2.2 463.2 0.6X + */ + benchmark.run() +} + } + + def perlineParsingOfWi
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala --- @@ -41,19 +41,25 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti */ val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean - private val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { sep => -require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") -sep + val encoding: Option[String] = parameters.get(ENCODING) + + val lineSeparator: Option[String] = parameters.get(LINE_SEPARATOR).map { lineSep => +require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.") + +lineSep } + // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) + val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => +lineSep.getBytes(encoding.getOrElse("UTF-8")) + } val lineSeparatorInWrite: Array[Byte] = -lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8)) +lineSeparatorInRead.getOrElse("\n".getBytes("UTF-8")) --- End diff -- not a big deal at all but was just wondering if there was a reason to choose `"UTF-8"` over `StandardCharsets.UTF_8` because I was thinking `StandardCharsets.UTF_8` is slightly better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r183227276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,11 +187,15 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + encoding: Option[String]): JsonParser = { val path = new Path(record.getPath()) -CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) +val is = CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path) + +encoding.map(enc => CreateJacksonParser.inputStream(enc, jsonFactory, is)) + .getOrElse(CreateJacksonParser.inputStream(jsonFactory, is)) --- End diff -- Hm, @MaxGekk, wouldn't this also do a record per operation too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180112595 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) + require(forcingLineSep, +s"""The lineSep option must be specified for the $enc encoding. + |Example: .option("lineSep", "|^|") --- End diff -- yea but if you execute this in SQL context, the example is irrelevant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180098845 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,26 +93,30 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd +val rowParser = parsedOptions.encoding.map { enc => + CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow, 0) --- End diff -- sorry I think the array is already wrapped by ByteArrayInputStream per record? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180050283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,26 +93,30 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd +val rowParser = parsedOptions.encoding.map { enc => + CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow, 0) --- End diff -- I didn't it originally but rejected the solution because overhead of wrapping the array by `ByteArrayInputStream` per-each row is very high. It increases execution time up to 20% in some cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180016246 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- > I don't think `Encoding was detected automatically` is not quite correct. It is absolutely correct. If `encoding` is not set, it is detected automatically by jackson. Look at the condition `if options.encoding.isEmpty =>`. > It might not help user solve the issue but it gives less correct information. It gives absolutely correct information. > They could thought it detects encoding correctly regardless of multiline option. The message DOESN'T say that `encoding` detected correctly. > Think about this scenario: users somehow get this exception and read Failed to parse a character. Encoding was detected automatically.. What would they think? They will look at the proposed solution `You might want to set it explicitly via the encoding option like` and will set `encoding` > I would think somehow the file is somehow failed to read It could be true even `encoding` is set correctly > but it looks detecting the encoding in the file correctly automatically I don't know why you decided that. I see nothing about `encoding` correctness in the message. > It's annoying to debug encoding related stuff in my experience. It would be nicer if we give the correct information as much as we can. What is your suggestion for the error message? > I am saying let's document the automatic encoding detection feature only for multiLine officially, which is true. I agree let's document that thought it is not related to this PR. This PR doesn't change behavior of encoding auto detection. And it must not change the behavior from my point of view. If you want to restrict the encoding auto-detection mechanism somehow, please, create separate PR. We will discuss separately what kind of customer's apps it will break. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180014636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") --- End diff -- Not important but it's more usual and was thinking of doing it if there isn't specific reason to make an exception from a norm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180014167 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the encoding + * is not specified (by default), it will be detected automatically. --- End diff -- > If encoding is not set, it will be detected by Jackson independently from multiline. Jackson detects but Spark doesn't correctly when `multiLine` is disabled even with this PR, as we talked. We found many holes. Why did you bring this again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180013348 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,26 +93,30 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd +val rowParser = parsedOptions.encoding.map { enc => + CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow, 0) --- End diff -- Can we do something like ```scala (factory JsonFactory, row: InternalRow) => val bais = new ByteArrayInputStream(row.getBinary(0))) CreateJacksonParser.inputStream(enc, factory, bais) ``` ? Looks `internalRow` doesn't actually deduplicate codes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180009422 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- I am saying let's document the automatic encoding detection feature only for `multiLine` officially, which is true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r180009312 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- I don't think `Encoding was detected automatically` is not quite correct. It might not help user solve the issue but it gives less correct information. They could thought it detects encoding correctly regardless of `multiline` option. Think about this scenario: users somehow get this exception and read `Failed to parse a character. Encoding was detected automatically.`. What would they think? I would think somehow the file is somehow failed to read but it looks detecting the encoding in the file correctly automatically regardless of other options. It's annoying to debug encoding related stuff in my experience. It would be nicer if we give the correct information as much as we can. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r18138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- ok, speaking about this concrete exception handling. The exception with the message is thrown ONLY when options.encoding.isEmpty is `true`. It means `encoding` is not set and actual encoding of a file was autodetected. The `msg` says about that actually: `Encoding was detected automatically`. Maybe `encoding` was detected correctly but the file contains a wrong char. In that case, the first sentence says this `Failed to parse a character`. The same could happen if you set `encoding` explicitly because you cannot guarantee that inputs are alway correct. > I think automatic detection is true only when multuline is enabled. Wrong char in input file can be in a file with UTF-8 read with `multiline = false` and in a file in UTF-16LE with `multiline = true`. My point is the mention of the `multiline` option in the error message doesn't help to user to solve the issue. A possible solution is to set `encoding` explicitly - what the message says actually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179966200 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- My point is automatic detection is true only when multuline is enabled and the message looks like it's always true. I don't think we should expose an ancomplete (or accidential) functionality in any case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179958997 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) + require(forcingLineSep, +s"""The lineSep option must be specified for the $enc encoding. + |Example: .option("lineSep", "|^|") --- End diff -- I believe it is better to provide to users possibility to just copy-past the text and replace option's value in the example instead of searching in docs how to do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179958858 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) + require(forcingLineSep, +s"""The lineSep option must be specified for the $enc encoding. + |Example: .option("lineSep", "|^|") + |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin) --- End diff -- Default UTF-8 doesn't explain why lineSep is required for other encodings --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179958785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- This message was added explicitly to tell our customers how to resolve issues like https://issues.apache.org/jira/browse/SPARK-23094 . Describing that in docs is not enough from our experience. Customers will just create support tickets, and we will have to spend time to figure out the root causes. The tip can help the customers to solve the problem on their side. /cc @brkyvz --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179957573 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the encoding + * is not specified (by default), it will be detected automatically. --- End diff -- If `encoding` is not set, it will be detected by Jackson independently from `multiline`. In particular, this PR addresses the issues when encoding is detected incorrectly. Look at the ticket: https://issues.apache.org/jira/browse/SPARK-23094 . Without this PR, users cannot read such files by the way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179955288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,26 +93,30 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd +val rowParser = parsedOptions.encoding.map { enc => + CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow, 0) --- End diff -- I moved 0 to here to address @cloud-fan review comment why the field by position 0 is taken from InternalRow. It should be clear here why internal rows have only one field at the 0 position. Please, tell me how could omit 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179954099 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") --- End diff -- I hesitated what to take `List` or `Set`. `Set` because order is not important here, `List` because `blacklist` is common used name, and `List` is more appropriate for the value which has the `list` word in its name. I don't see any reasons for `Seq` but if you believe it is important to have `Seq` instead of `List` here I will replace it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952254 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. --- End diff -- I think automatic detection is true only when multuline is enabled. We can just describe it in documentation and, reward this message or even just remove this message too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179951972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") --- End diff -- I believe we use `Seq` if there isn't specific reason for `List`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952617 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2162,4 +2162,262 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(ds.schema == new StructType().add("f1", LongType)) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179951938 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") --- End diff -- Seems this nonempty requirement is removed. Did I miss something or was it mistake? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952150 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) --- End diff -- let's add "when multiLine is disabled". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952386 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the encoding --- End diff -- I think `UTF-32` is blacklisted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179951956 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") --- End diff -- Mind if I ask to leave a simple comment explaining why these are blocked for now? (or do you plan to fix them soon?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952240 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) + require(forcingLineSep, +s"""The lineSep option must be specified for the $enc encoding. + |Example: .option("lineSep", "|^|") --- End diff -- I think we are fine to remove this example .. Can we just use prose, for example, `'lineSep' option must be explicitly set when 'encoding' option is specified.` (feel free to not use it as is. just was thinking)? It doesn't describe SQL syntax too ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952127 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) --- End diff -- Can we maybe do ```scala val blacklist = Seq(Charset.forName("UTF-8"), Charset.forName("UTF-32")) val isBlacklisted = blacklist.contains(Charset.forName(enc)) ``` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952336 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -361,6 +361,15 @@ class JacksonParser( // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) + case e: CharConversionException if options.encoding.isEmpty => +val msg = + """Failed to parse a character. Encoding was detected automatically. +|You might want to set it explicitly via the encoding option like: +| .option("encoding", "UTF-8") --- End diff -- ditto for prose ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952204 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A string between two consecutive JSON records. + */ + val lineSeparator: Option[String] = parameters.get("lineSep") + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + val blacklist = List("UTF16", "UTF32") + val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", "")) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) + require(forcingLineSep, +s"""The lineSep option must be specified for the $enc encoding. + |Example: .option("lineSep", "|^|") + |Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin) --- End diff -- It's UTF-8 by default .. I think we don't have to explain this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179951859 --- Diff: python/pyspark/sql/readwriter.py --- @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param allowUnquotedControlChars: allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. +:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. +If None is set, the encoding of input JSON will be detected automatically. --- End diff -- one more leading space for `If None ...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,11 +188,15 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { + private def createParser( + jsonFactory: JsonFactory, + record: PortableDataStream, + encoding: Option[String] = None): JsonParser = { --- End diff -- `encoding: Option[String] = None` -> `encoding: Option[String]`. You don't have to worry about signature for private ones. Theoretically MiMa will detect the signature changes for public APIs, which make the Jenkins tests failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -92,26 +93,30 @@ object TextInputJsonDataSource extends JsonDataSource { sparkSession: SparkSession, inputPaths: Seq[FileStatus], parsedOptions: JSONOptions): StructType = { -val json: Dataset[String] = createBaseDataset( - sparkSession, inputPaths, parsedOptions.lineSeparator) +val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) + inferFromDataset(json, parsedOptions) } def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) -val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) -JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) +val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd +val rowParser = parsedOptions.encoding.map { enc => + CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow, 0) --- End diff -- Seems the last argument is always 0. Can we omit this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952380 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the encoding --- End diff -- `jsons` -> `JSON`, `JSON records`, `JSON lines` or `JSON strings`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179952399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic + * or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the encoding + * is not specified (by default), it will be detected automatically. --- End diff -- Likewise, I think it's true when multiLine is enabled. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179911912 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overw
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179911884 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString --- End diff -- Please, explain what do you mean. The method returns full path to a test, you code writes something in UTF-8. Or do you mean replacing test files by forming them in code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179749411 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overw
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179744937 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179744726 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179743195 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overw
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179741046 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179740726 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("o
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179737478 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } - def testLineSeparator(lineSep: String): Unit = { -test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") { - // Read - val data = -s""" - | {"f": - |"a", "f0": 1}$lineSep{"f": - | - |"c", "f0": 2}$lineSep{"f": "d", "f0": 3} -""".stripMargin - val dataWithTrailingLineSep = s"$data$lineSep" - - Seq(data, dataWithTrailingLineSep).foreach { lines => -withTempPath { path => - Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8)) - val df = spark.read.option("lineSep", lineSep).json(path.getAbsolutePath) - val expectedSchema = -StructType(StructField("f", StringType) :: StructField("f0", LongType) :: Nil) - checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF()) - assert(df.schema === expectedSchema) + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + // This option will be replaced by .option("lineSep", "x00 0a") + // as soon as lineSep allows to specify sequence of bytes in hexadecimal format. + .option("mode", "DROPMALFORMED") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported charset name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("charset" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the charset option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("charset" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified charset is not matched to actual charset") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("charset" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179737147 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(df.schema === expectedSchema) } } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq( + Row("Chris", "Baird"), Row("Doug", "Rood") +)) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[java.io.UnsupportedEncodingException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkCharset( +expectedCharset: String, +pathToJsonFiles: String, +expectedContent: String + ): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val jsonContent = jsonFiles.map { file => + scala.io.Source.fromFile(file, expectedCharset).mkString +} +val cleanedContent = jsonContent + .mkString + .trim + .replaceAll(" ", "") + +assert(cleanedContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkCharset( +expectedCharset = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""" + ) +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overw
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179622965 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,28 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String]): JsonParser = { --- End diff -- ah, sure. I thought you missed (because they are hidden). will try to take another look soon too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179521858 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,28 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String]): JsonParser = { --- End diff -- @HyukjinKwon Sorry, I saw the comments but just haven't had time to address them. I will do that tomorrow or at the weekend. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179507104 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,28 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String]): JsonParser = { --- End diff -- https://github.com/apache/spark/pull/20937#discussion_r178476075 seems missed and other hidden comments too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179506801 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } - def text(jsonFactory: JsonFactory, record: Text): JsonParser = { -jsonFactory.createParser(record.getBytes, 0, record.getLength) + def text(jsonFactory: JsonFactory, record: Text, encoding: Option[String] = None): JsonParser = { +encoding match { --- End diff -- https://github.com/apache/spark/pull/20937#discussion_r178476075 seems missed and other hidden comments too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r179501032 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +85,34 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) - val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => -require(sep.nonEmpty, "'lineSep' cannot be an empty string.") -sep + /** + * A sequence of bytes between two consecutive json records. --- End diff -- it's weird to specify bytes as line separator, can we use string? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org