[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178496032
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard charset name, for example UTF-8, UTF-16 
and UTF-32. If None is
+  set, the encoding of input json will be detected 
automatically.
--- End diff --

Yup, I admit it. In that case, we could optionally describe it a bit more 
details with the multiline mode too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178495813
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard charset name, for example UTF-8, UTF-16 
and UTF-32. If None is
+  set, the encoding of input json will be detected 
automatically.
--- End diff --

I would agree to you about the per-line mode, but not about the multiline 
mode. For example, the test checks that: 
https://github.com/MaxGekk/spark-1/blob/53834005ba22c0c3e7be883949ab01a5bf1a0b9a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala#L2148-L2156


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478826
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
+  .mkString
+  .trim
+  .replaceAll(" ", "")
+
+assert(cleanedContent == expectedContent)
+  }
+
+  test("SPARK-23723: save json in UTF-32BE") {
+val encoding = "UTF-32BE"
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+.format("json").mode("overwrite")
+.save(path.getCanonicalPath)
+
+  checkCharset(
+expectedCharset = encoding,
+pathToJsonFiles = path.getCanonicalPath,
+expectedContent = """{"_1":"Dog","_2":42}"""
+  )
+}
+  }
+
+  test("SPARK-23723: save json in default encoding - UTF-8") {
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.format("json").mode("o

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478996
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
--- End diff --

Can we use `new String(Files.readAllBytes(partFile.toPath), 
StandardCharsets.UTF_8)` just for consistency above?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478728
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
+  .mkString
+  .trim
+  .replaceAll(" ", "")
+
+assert(cleanedContent == expectedContent)
+  }
+
+  test("SPARK-23723: save json in UTF-32BE") {
+val encoding = "UTF-32BE"
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+.format("json").mode("overwrite")
+.save(path.getCanonicalPath)
+
+  checkCharset(
+expectedCharset = encoding,
+pathToJsonFiles = path.getCanonicalPath,
+expectedContent = """{"_1":"Dog","_2":42}"""
+  )
+}
+  }
+
+  test("SPARK-23723: save json in default encoding - UTF-8") {
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.format("json").mode("o

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178475553
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard charset name, for example UTF-8, UTF-16 
and UTF-32. If None is
--- End diff --

just for sure is this intended? I thought `standard charset` -> `encoding 
(charset) ` to be consistent with write path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476439
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -361,6 +361,15 @@ class JacksonParser(
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
 throw BadRecordException(() => recordLiteral(record), () => None, 
e)
+  case e: CharConversionException if options.encoding.isEmpty =>
+val msg =
+  """Failed to parse a character. Charset was detected 
automatically.
+|You might want to set it explicitly via the charset option 
like:
+|  .option("charset", "UTF-8")
--- End diff --

`charset` -> `encoding`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178479007
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
--- End diff --

We could make it inlined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476163
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+val bain = new ByteArrayInputStream(record.getBytes, 0, 
record.getLength)
+jsonFactory.createParser(new InputStreamReader(bain, enc))
+  case _ =>
+jsonFactory.createParser(record.getBytes, 0, record.getLength)
+}
   }
 
-  def inputStream(jsonFactory: JsonFactory, record: InputStream): 
JsonParser = {
-jsonFactory.createParser(record)
+  def inputStream(
--- End diff --

ditto for avoiding type dispatch on `encoding`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476229
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+val bain = new ByteArrayInputStream(record.getBytes, 0, 
record.getLength)
+jsonFactory.createParser(new InputStreamReader(bain, enc))
+  case _ =>
+jsonFactory.createParser(record.getBytes, 0, record.getLength)
+}
   }
 
-  def inputStream(jsonFactory: JsonFactory, record: InputStream): 
JsonParser = {
-jsonFactory.createParser(record)
+  def inputStream(
+  jsonFactory: JsonFactory,
+  is: InputStream,
+  encoding: Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+jsonFactory.createParser(new InputStreamReader(is, enc))
+  case _ =>
+jsonFactory.createParser(is)
+}
+  }
+
+  def internalRow(
+jsonFactory: JsonFactory,
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178475983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
--- End diff --

`encoding: Option[String] = None` -> `encoding: Option[String]`. You don't 
have to worry about signature for private ones. Theoretically MiMa will detect 
the signature changes for public APIs, which make the Jenkins tests failed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476459
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -361,6 +361,15 @@ class JacksonParser(
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
 throw BadRecordException(() => recordLiteral(record), () => None, 
e)
+  case e: CharConversionException if options.encoding.isEmpty =>
+val msg =
+  """Failed to parse a character. Charset was detected 
automatically.
+|You might want to set it explicitly via the charset option 
like:
+|  .option("charset", "UTF-8")
+|Example of supported charsets:
+|  UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, 
UTF-32LE
--- End diff --

Let's take out blacklisted ones.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178475654
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard charset name, for example UTF-8, UTF-16 
and UTF-32. If None is
+  set, the encoding of input json will be detected 
automatically.
--- End diff --

I think we shouldn't say it's automatically detected yet. I still think it 
accidentally works. Newline's encoding is not automatically detected and the 
encoding is detected for each line not the whole file if I understood correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478788
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
+  .mkString
+  .trim
+  .replaceAll(" ", "")
+
+assert(cleanedContent == expectedContent)
+  }
+
+  test("SPARK-23723: save json in UTF-32BE") {
+val encoding = "UTF-32BE"
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+.format("json").mode("overwrite")
+.save(path.getCanonicalPath)
+
+  checkCharset(
+expectedCharset = encoding,
+pathToJsonFiles = path.getCanonicalPath,
+expectedContent = """{"_1":"Dog","_2":42}"""
+  )
+}
+  }
+
+  test("SPARK-23723: save json in default encoding - UTF-8") {
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.format("json").mode("o

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478498
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 ---
@@ -41,19 +41,23 @@ private[text] class TextOptions(@transient private val 
parameters: CaseInsensiti
*/
   val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
 
-  private val lineSeparator: Option[String] = 
parameters.get(LINE_SEPARATOR).map { sep =>
-require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
-sep
+  val encoding: Option[String] = parameters.get(ENCODING)
+
+  val lineSeparator: Option[Array[Byte]] = 
parameters.get(LINE_SEPARATOR).map { lineSep =>
--- End diff --

Can we have the same signature with JSON one too? `String`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178479126
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
+  .mkString
+  .trim
+  .replaceAll(" ", "")
+
+assert(cleanedContent == expectedContent)
+  }
+
+  test("SPARK-23723: save json in UTF-32BE") {
+val encoding = "UTF-32BE"
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+.format("json").mode("overwrite")
+.save(path.getCanonicalPath)
+
+  checkCharset(
+expectedCharset = encoding,
+pathToJsonFiles = path.getCanonicalPath,
+expectedContent = """{"_1":"Dog","_2":42}"""
+  )
+}
+  }
+
+  test("SPARK-23723: save json in default encoding - UTF-8") {
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.format("json").mode("o

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
--- End diff --

Just a question, is it hard to do this with `Files.write(path.toPath, 
data.getBytes(StandardCharsets.UTF_8))`? I thought it makes other tests easily 
added in the future. If it needs many changes, I am fine as is too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478647
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
--- End diff --

ditto for moving it up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178477441
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -153,7 +158,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
   parsedOptions: JSONOptions): StructType = {
 val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, 
inputPaths)
 val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, 
parsedOptions)
-JsonInferSchema.infer(sampled, parsedOptions, createParser)
+
+JsonInferSchema.infer[PortableDataStream](
+  sampled,
+  parsedOptions,
+  createParser(_, _, parsedOptions.encoding)
+)
--- End diff --

Let's move it up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478745
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported encoding name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the encoding option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified encoding is not matched to actual 
encoding") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("encoding" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.Source.fromFile(file, expectedCharset).mkString
+}
+val cleanedContent = jsonContent
+  .mkString
+  .trim
+  .replaceAll(" ", "")
+
+assert(cleanedContent == expectedContent)
+  }
+
+  test("SPARK-23723: save json in UTF-32BE") {
+val encoding = "UTF-32BE"
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
+.format("json").mode("overwrite")
+.save(path.getCanonicalPath)
+
+  checkCharset(
+expectedCharset = encoding,
+pathToJsonFiles = path.getCanonicalPath,
+expectedContent = """{"_1":"Dog","_2":42}"""
+  )
+}
+  }
+
+  test("SPARK-23723: save json in default encoding - UTF-8") {
+withTempPath { path =>
+  val df = spark.createDataset(Seq(("Dog", 42)))
+  df.write
+.format("json").mode("o

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478627
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2127,4 +2127,243 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   assert(df.schema === expectedSchema)
 }
   }
+
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .option("encoding", "UTF-16")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
--- End diff --

inlined


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
   .values
   }
 
-  private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
+  private def createParser(
+  jsonFactory: JsonFactory,
+  record: PortableDataStream,
+  charset: Option[String] = None): JsonParser = {
 val path = new Path(record.getPath())
 CreateJacksonParser.inputStream(
   jsonFactory,
-  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
+  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
+  charset
+)
   }
 
   override def readFile(
   conf: Configuration,
   file: PartitionedFile,
   parser: JacksonParser,
   schema: StructType): Iterator[InternalRow] = {
+def createInputStream() = {
--- End diff --

I admit it but nested function is a thing to avoid too unless if it really 
cleans up the codes or is required. Let's just leave it as was if both don't 
look quite cool .. :). At least that's usually what I do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478363
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -151,7 +153,13 @@ private[json] class JsonOutputWriter(
 context: TaskAttemptContext)
   extends OutputWriter with Logging {
 
-  private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path))
+  private val encoding = options.encoding match {
+case Some(charsetName) => Charset.forName(charsetName)
+case _ => StandardCharsets.UTF_8
--- End diff --

`_` -> `None`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178475577
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -237,6 +237,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
   characters (ASCII characters 
with value less than 32,
   including tab and line feed 
characters) or not.
+:param encoding: standard charset name, for example UTF-8, UTF-16 
and UTF-32. If None is
+  set, the encoding of input json will be detected 
automatically.
--- End diff --

`json` -> `JSON` to be consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178475690
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+val bain = new ByteArrayInputStream(record.getBytes, 0, 
record.getLength)
+jsonFactory.createParser(new InputStreamReader(bain, enc))
+  case _ =>
--- End diff --

`_` => `None`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478336
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
   .values
   }
 
-  private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
+  private def createParser(
+  jsonFactory: JsonFactory,
+  record: PortableDataStream,
+  charset: Option[String] = None): JsonParser = {
 val path = new Path(record.getPath())
 CreateJacksonParser.inputStream(
   jsonFactory,
-  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
+  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
+  charset
+)
   }
 
   override def readFile(
   conf: Configuration,
   file: PartitionedFile,
   parser: JacksonParser,
   schema: StructType): Iterator[InternalRow] = {
+def createInputStream() = {
+  CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath)))
+}
 def partitionedFileString(ignored: Any): UTF8String = {
-  Utils.tryWithResource {
-CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath)))
-  } { inputStream =>
-UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
+  Utils.tryWithResource(createInputStream()) { is =>
+UTF8String.fromBytes(ByteStreams.toByteArray(is))
   }
 }
+val charset = parser.options.encoding
 
 val safeParser = new FailureSafeParser[InputStream](
-  input => parser.parse(input, CreateJacksonParser.inputStream, 
partitionedFileString),
+  input => parser.parse[InputStream](
+input,
+CreateJacksonParser.inputStream(_, _, charset),
+partitionedFileString
+  ),
--- End diff --

ditto for moving up the last line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178477325
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -107,4 +126,9 @@ private[sql] class JSONOptions(
   allowBackslashEscapingAnyCharacter)
 factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, 
allowUnquotedControlChars)
   }
+
+  def getTextOptions: Map[String, String] = {
--- End diff --

`Map[String, String]() ` -> `Map.empty[String, String]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178477311
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -107,4 +126,9 @@ private[sql] class JSONOptions(
   allowBackslashEscapingAnyCharacter)
 factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, 
allowUnquotedControlChars)
   }
+
+  def getTextOptions: Map[String, String] = {
--- End diff --

This is currently used only once. Can we just put it where it's used?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476391
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -86,14 +85,34 @@ private[sql] class JSONOptions(
 
   val multiLine = 
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
-  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep 
=>
-require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
-sep
+  /**
+   * A sequence of bytes between two consecutive json records.
--- End diff --

`A sequence of bytes` -> `A string` and `json` -> `JSON`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178478519
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2070,9 +2070,9 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   // Read
   val data =
 s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
+   |  {"f":
+   |"a", "f0": 1}$lineSep{"f":
+   |
--- End diff --

Seems a mistake ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476321
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+val bain = new ByteArrayInputStream(record.getBytes, 0, 
record.getLength)
+jsonFactory.createParser(new InputStreamReader(bain, enc))
+  case _ =>
+jsonFactory.createParser(record.getBytes, 0, record.getLength)
+}
   }
 
-  def inputStream(jsonFactory: JsonFactory, record: InputStream): 
JsonParser = {
-jsonFactory.createParser(record)
+  def inputStream(
+  jsonFactory: JsonFactory,
+  is: InputStream,
+  encoding: Option[String] = None): JsonParser = {
+encoding match {
+  case Some(enc) =>
+jsonFactory.createParser(new InputStreamReader(is, enc))
+  case _ =>
+jsonFactory.createParser(is)
+}
+  }
+
+  def internalRow(
+jsonFactory: JsonFactory,
+row: InternalRow,
+field: Int,
+encoding: Option[String] = None
+  ): JsonParser = {
--- End diff --

Let's move this line up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476075
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, encoding: 
Option[String] = None): JsonParser = {
+encoding match {
--- End diff --

Looks we create a partial function and then use it and therefore it's going 
to do the type dispatch for every record. Can we avoid this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178476931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -92,32 +93,34 @@ object TextInputJsonDataSource extends JsonDataSource {
   sparkSession: SparkSession,
   inputPaths: Seq[FileStatus],
   parsedOptions: JSONOptions): StructType = {
-val json: Dataset[String] = createBaseDataset(
-  sparkSession, inputPaths, parsedOptions.lineSeparator)
+val json: Dataset[String] = createBaseDataset(sparkSession, 
inputPaths, parsedOptions)
+
 inferFromDataset(json, parsedOptions)
   }
 
   def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): 
StructType = {
 val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
-val rdd: RDD[UTF8String] = 
sampled.queryExecution.toRdd.map(_.getUTF8String(0))
-JsonInferSchema.infer(rdd, parsedOptions, 
CreateJacksonParser.utf8String)
+val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
+
+JsonInferSchema.infer[InternalRow](
+  rdd,
+  parsedOptions,
+  CreateJacksonParser.internalRow(_, _, 0, parsedOptions.encoding)
+)
   }
 
   private def createBaseDataset(
   sparkSession: SparkSession,
   inputPaths: Seq[FileStatus],
-  lineSeparator: Option[String]): Dataset[String] = {
-val textOptions = lineSeparator.map { lineSep =>
-  Map(TextOptions.LINE_SEPARATOR -> lineSep)
-}.getOrElse(Map.empty[String, String])
-
+  parsedOptions: JSONOptions
+  ): Dataset[String] = {
--- End diff --

Let's move this line up too. I think it should be either


```
...
parsedOptions: JSONOptions)
  : Dataset[String] = {
```

or

```
parsedOptions: JSONOptions) : Dataset[String] = {
```
https://github.com/databricks/scala-style-guide#spacing-and-indentation

but somehow I believe we usually do the latter style.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-04-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178477458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
   .values
   }
 
-  private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
+  private def createParser(
+  jsonFactory: JsonFactory,
+  record: PortableDataStream,
+  charset: Option[String] = None): JsonParser = {
 val path = new Path(record.getPath())
 CreateJacksonParser.inputStream(
   jsonFactory,
-  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
+  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
+  charset
+)
--- End diff --

ditto for moving it up the last parenthesis.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178434446
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
   .values
   }
 
-  private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
+  private def createParser(
+  jsonFactory: JsonFactory,
+  record: PortableDataStream,
+  charset: Option[String] = None): JsonParser = {
 val path = new Path(record.getPath())
 CreateJacksonParser.inputStream(
   jsonFactory,
-  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
+  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
+  charset
+)
   }
 
   override def readFile(
   conf: Configuration,
   file: PartitionedFile,
   parser: JacksonParser,
   schema: StructType): Iterator[InternalRow] = {
+def createInputStream() = {
--- End diff --

I don't know but for me it is easier to read if I see the same name instead 
of comparing 2 expressions and recognizing them as the same. I believe the 
below is more complicated:
```
Utils.tryWithResource(CodecStreams.createInputStreamWithCloseResource(conf, 
new Path(new URI(file.filePath { is =>
UTF8String.fromBytes(ByteStreams.toByteArray(is))
 }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178430652
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
--- End diff --

yup that'd work too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178428640
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
--- End diff --

I would blacklist 2 encodings for now - UTF-16 and UTF-32 but allow UTF-8, 
UTF-16LE/BE, UTF-32LE/BE and others like ISO 8859-1. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -151,7 +153,16 @@ private[json] class JsonOutputWriter(
 context: TaskAttemptContext)
   extends OutputWriter with Logging {
 
-  private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path))
+  private val charset = options.charset match {
+case Some(charsetName) => Charset.forName(charsetName)
+case _ => StandardCharsets.UTF_8
+  }
+
+  private val writer = CodecStreams.createOutputStreamWriter(
+context,
+new Path(path),
+charset
+  )
--- End diff --

Seems it's going to fit in two lines.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427418
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -86,14 +86,30 @@ private[sql] class JSONOptions(
 
   val multiLine = 
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
-  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep 
=>
-require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
-sep
+  /**
+   * A sequence of bytes between two consecutive json records.
+   */
+  val lineSeparator: Option[String] = parameters.get("lineSep")
+
+  /**
+   * Standard charset name. For example UTF-8, UTF-16 and UTF-32.
+   * If charset is not specified (None), it will be detected automatically.
+   */
+  val charset: Option[String] = parameters.get("charset")
+.orElse(parameters.get("encoding")).map { cs =>
+  if (multiLine == false && cs != "UTF-8" && lineSeparator.isEmpty) {
--- End diff --

Shall we do this with `require`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427349
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -175,33 +185,43 @@ object MultiLineJsonDataSource extends JsonDataSource 
{
   .values
   }
 
-  private def createParser(jsonFactory: JsonFactory, record: 
PortableDataStream): JsonParser = {
+  private def createParser(
+  jsonFactory: JsonFactory,
+  record: PortableDataStream,
+  charset: Option[String] = None): JsonParser = {
 val path = new Path(record.getPath())
 CreateJacksonParser.inputStream(
   jsonFactory,
-  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
+  
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
+  charset
+)
   }
 
   override def readFile(
   conf: Configuration,
   file: PartitionedFile,
   parser: JacksonParser,
   schema: StructType): Iterator[InternalRow] = {
+def createInputStream() = {
--- End diff --

To be honest, I think I wouldn't have a nested function to deduplicate two 
lines ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427279
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
--- End diff --

No, we don't know this yet. Let's remove this comment and the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427175
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 ---
@@ -41,13 +41,16 @@ private[text] class TextOptions(@transient private val 
parameters: CaseInsensiti
*/
   val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
 
-  private val lineSeparator: Option[String] = 
parameters.get(LINE_SEPARATOR).map { sep =>
-require(sep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")
-sep
+  val charset: Option[String] = Some("UTF-8")
+
+  val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").map { 
lineSep =>
+require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty 
string.")
+lineSep.getBytes(charset.getOrElse(
--- End diff --

Shall we just `require` here too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427113
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
--- End diff --

You don't have to document 100% list. just best effort thing. We can fix 
and add more encodings later if anyone found later, if it's hard to find out 
every encoding supported.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427139
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -86,14 +86,30 @@ private[sql] class JSONOptions(
 
   val multiLine = 
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
-  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep 
=>
-require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
-sep
+  /**
+   * A sequence of bytes between two consecutive json records.
+   */
+  val lineSeparator: Option[String] = parameters.get("lineSep")
+
+  /**
+   * Standard charset name. For example UTF-8, UTF-16 and UTF-32.
+   * If charset is not specified (None), it will be detected automatically.
+   */
+  val charset: Option[String] = parameters.get("charset")
+.orElse(parameters.get("encoding")).map { cs =>
+  if (multiLine == false && cs != "UTF-8" && lineSeparator.isEmpty) {
+throw new IllegalArgumentException(
+  s"""Please, set the 'lineSep' option for the given charset $cs.
+ |Example: .option("lineSep", "|^|")
+ |Note: lineSep can be detected automatically for UTF-8 
only.""".stripMargin
+)
--- End diff --

Let's move this parenthesis move up and inlined if you are also fine with 
that (just because that's what I have seen more frequently).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427033
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 ---
@@ -39,11 +40,36 @@ private[sql] object CreateJacksonParser extends 
Serializable {
 jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
   }
 
-  def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
-jsonFactory.createParser(record.getBytes, 0, record.getLength)
+  def text(jsonFactory: JsonFactory, record: Text, charset: Option[String] 
= None): JsonParser = {
+charset match {
--- End diff --

`:%s/charset/encoding` .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178427011
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
+.json(testFile(fileName))
+.count()
+}
+val errMsg = exception.getMessage
+
+assert(errMsg.contains("Malformed records are detected in record 
parsing"))
+  }
+
+  def checkCharset(
+expectedCharset: String,
+pathToJsonFiles: String,
+expectedContent: String
+  ): Unit = {
+val jsonFiles = new File(pathToJsonFiles)
+  .listFiles()
+  .filter(_.isFile)
+  .filter(_.getName.endsWith("json"))
+val jsonContent = jsonFiles.map { file =>
+  scala.io.So

[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178426994
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
--- End diff --

If that's difficult (or weird or hacky), let's just make whitelist and 
document them explicitly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178426903
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
--- End diff --

You don't have to completely remove this out .. just take out the ones not 
working.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178426879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -86,14 +86,30 @@ private[sql] class JSONOptions(
 
   val multiLine = 
parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
-  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep 
=>
-require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
-sep
+  /**
+   * A sequence of bytes between two consecutive json records.
+   */
+  val lineSeparator: Option[String] = parameters.get("lineSep")
+
+  /**
+   * Standard charset name. For example UTF-8, UTF-16 and UTF-32.
+   * If charset is not specified (None), it will be detected automatically.
+   */
+  val charset: Option[String] = parameters.get("charset")
--- End diff --

I thought we talked `encoding` would have higher priority like CSV.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20937#discussion_r178426866
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2065,29 +2065,238 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 }
   }
 
-  def testLineSeparator(lineSep: String): Unit = {
-test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
-  // Read
-  val data =
-s"""
-  |  {"f":
-  |"a", "f0": 1}$lineSep{"f":
-  |
-  |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
-""".stripMargin
-  val dataWithTrailingLineSep = s"$data$lineSep"
-
-  Seq(data, dataWithTrailingLineSep).foreach { lines =>
-withTempPath { path =>
-  Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
-  val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
-  val expectedSchema =
-StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
-  checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
-  assert(df.schema === expectedSchema)
+  def testFile(fileName: String): String = {
+
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
+  }
+
+  test("SPARK-23723: json in UTF-16 with BOM") {
+val fileName = "json-tests/utf16WithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  // This option will be replaced by .option("lineSep", "x00 0a")
+  // as soon as lineSep allows to specify sequence of bytes in 
hexadecimal format.
+  .option("mode", "DROPMALFORMED")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(
+  Row("Chris", "Baird"), Row("Doug", "Rood")
+))
+  }
+
+  test("SPARK-23723: multi-line json in UTF-32BE with BOM") {
+val fileName = "json-tests/utf32BEWithBOM.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Use user's encoding in reading of multi-line json in 
UTF-16LE") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("encoding" -> "UTF-16LE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+  test("SPARK-23723: Unsupported charset name") {
+val invalidCharset = "UTF-128"
+val exception = intercept[java.io.UnsupportedEncodingException] {
+  spark.read
+.options(Map("charset" -> invalidCharset, "lineSep" -> "\n"))
+.json(testFile("json-tests/utf16LE.json"))
+.count()
+}
+
+assert(exception.getMessage.contains(invalidCharset))
+  }
+
+  test("SPARK-23723: checking that the charset option is case agnostic") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val jsonDF = spark.read.schema(schema)
+  .option("multiline", "true")
+  .options(Map("charset" -> "uTf-16lE"))
+  .json(testFile(fileName))
+
+checkAnswer(jsonDF, Seq(Row("Chris", "Baird")))
+  }
+
+
+  test("SPARK-23723: specified charset is not matched to actual charset") {
+val fileName = "json-tests/utf16LE.json"
+val schema = new StructType().add("firstName", 
StringType).add("lastName", StringType)
+val exception = intercept[SparkException] {
+  spark.read.schema(schema)
+.option("mode", "FAILFAST")
+.option("multiline", "true")
+.options(Map("charset" -> "UTF-16BE"))
--- End diff --

Can we have a test case with `UTF-16`? I assume the delimiter itself has 
the BOM and won't work correctly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20937: [SPARK-23723][SPARK-23724][SQL] Support custom en...

2018-03-29 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/20937

[SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files

## What changes were proposed in this pull request?

I propose new option for JSON datasource which allows to specify encoding 
(charset) of input and output files. Here is an example of using of the option:

```
spark.read.schema(schema)
  .option("multiline", "true")
  .option("encoding", "UTF-16LE")
  .json(fileName)
```

If the option is not specified, charset auto-detection mechanism is used by 
default.

The option can be used for saving datasets to jsons. Currently Spark is 
able to save datasets into json files in UTF-8 charset only. The changes allow 
to save data in any supported charset. Here is the approximate list of 
supported charsets by Oracle Java SE: 
https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . 
An user can specify the charset of output jsons via the charset option like 
`.option("charset", "UTF-16")`. By default the output charset is still UTF-8 to 
keep backward compatibility.

The solution has the following restrictions for per-line mode (`multiline = 
false`):

- If charset is different from UTF-8, the lineSep option must be specified. 
The option required because Hadoop LineReader cannot detect the line separator 
correctly. Here is the ticket for solving the issue: 
https://issues.apache.org/jira/browse/SPARK-23725

- Json files started from 
[BOM](https://en.wikipedia.org/wiki/Byte_order_mark) cannot be read properly. A 
possible solution is a flexible format for `lineSep` which allows to specify 
line separator as sequence of bytes independently from encoding. A pull request 
for that will be prepared soon.   

## How was this patch tested?

I added the following tests:
- reads an json file in UTF-16 charset with BOM
- read json file by using charset auto detection (UTF-32BE with BOM) 
- read json file using of user's charset (UTF-16LE)
- saving in UTF-32BE and read the result by standard library (not by Spark)
- checking that default charset is UTF-8
- handling wrong (unsupported) charset


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 json-encoding-line-sep

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20937


commit b2e92b4706c5ed3b141805933f29beb87e1b7371
Author: Maxim Gekk 
Date:   2018-02-11T20:06:53Z

Test for reading json in UTF-16 with BOM

commit cb2f27ba73cb5838e2910c31ca204100bb4eebca
Author: Maxim Gekk 
Date:   2018-02-11T20:48:35Z

Use user's charset or autodetect it if the charset is not specified

commit 0d45fd382bb90ebd7161d57a3da23820b4497f67
Author: Maxim Gekk 
Date:   2018-02-13T09:48:08Z

Added a type and a comment for charset

commit 1fb9b321a4fac0f41cfb9dd5f85b61feb6796227
Author: Maxim Gekk 
Date:   2018-02-13T10:00:27Z

Replacing the monadic chaining by matching because it is more readable

commit c3b04ee68338ad4f93a5361a41db28b37f020907
Author: Maxim Gekk 
Date:   2018-02-13T10:44:19Z

Keeping the old method for backward compatibility

commit 93d38794dd261ee1bbe2497470ee43de1186ef3c
Author: Maxim Gekk 
Date:   2018-02-13T10:54:52Z

testFile is moved into the test to make more local because it is used only 
in the test

commit 15798a1ce61df29e9a32f960e755495e3d63f4e3
Author: Maxim Gekk 
Date:   2018-02-13T11:15:25Z

Adding the charset as third parameter to the text method

commit cc05ce9af7c9f1d14bd10c1f46a60ce043c13fe1
Author: Maxim Gekk 
Date:   2018-02-13T11:29:57Z

Removing whitespaces at the end of the line

commit 74f2026e62389902ab7a4c418aa96a492fa14f6f
Author: Maxim Gekk 
Date:   2018-02-13T12:29:28Z

Fix the comment in javadoc style

commit 4856b8e0b287b3ba3331865298f0603dde18459c
Author: Maxim Gekk 
Date:   2018-02-13T12:32:48Z

Simplifying of the UTF-16 test

commit 084f41fb6edd7c86aeb8643973119cb4b38a34fa
Author: Maxim Gekk 
Date:   2018-02-15T17:33:25Z

A hint to the exception how to set the charset explicitly

commit 31cd793a86e6a0e48e0150ffb8c36da2872c65ca
Author: Maxim Gekk 
Date:   2018-02-15T18:00:55Z

Fix for scala style checks

commit 6eacd186a954a3f724ee607826b17f432ead77e1
Author: Maxim Gekk 
Date:   2018-02-15T18:44:04Z

Run tests again

commit 3b4a509d0260cfab720a5471ccd937de55c56093
Author: Maxim Gekk 
Date:   2018-02-15T19:06:06Z

Improving of the exception message

commit cd1124ef7e6329f4dcd6926064271cd24b5a150d
Author: Maxim Gekk 
Date:   2018-02-15T19:41:35Z

Appended the original message to the exception

commit ebf53904151582eef6d95780ca30b773404ae141
Aut