[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16386


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683820
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
--- End diff --

shall we call `.coalesce(1)` to make sure we only write to a singe file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683991
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write.json(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
--- End diff --

nit: `Seq(Tuple1("{}{invalid}")).toDF("value")`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101684712
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+  val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => "\"" + x + "\""
+  }.mkString(", ")
+  throw new AnalysisException(s"Duplicate column(s) : 
$duplicateColumns found, " +
+s"cannot save to JSON format")
+}
+  }
+}
+
+object JsonDataSource {
+  def apply(options: JSONOptions): JsonDataSource[_] = {
+if (options.wholeFile) {
+  WholeFileJsonDataSource
+} else {
+  TextInputJsonDataSource
+}
+  }
+
+  /**
+   * Create a new [[RDD]] via the supplied callback if there is at least 
one file to process,
+   * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned.
+   */
+  def createBaseRdd[T : ClassTag](
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus])(
+  fn: (Configuration, String) => RDD[T]): RDD[T] = {
+val paths 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,30 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: T => UTF8String): Seq[InternalRow] = {
--- End diff --

nit: `recordToString`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101685228
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write.json(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+// this is the minimum partition count that avoids hash collisions
+.repartition(corruptRecordCount * 4, F.hash($"value"))
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).option("mode", 
"PERMISSIVE").json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
+  "outer")
+.agg(
+  F.count($"dummy").as("valid"),
+  F.count($"_corrupt_record").as("corrupt"),
+  F.count("*").as("count"))
+  checkAnswer(counts, Row(1, 4, 6))
--- End diff --

why `count(*)` is 6?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683953
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write.json(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
--- End diff --

it's not compressed, let's just call it `jsonFiles`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683631
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -193,6 +196,10 @@ class PortableDataStream(
 }
   }
 
+  @Since("1.2.0")
   def getPath(): String = path
+
+  @Since("2.2.0")
+  def getConfiguration: Configuration = conf
--- End diff --

nit: we should rename it to `getConf`, `getConfiguration` is too verbose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683916
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101684619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -37,29 +32,30 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
 class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
+  override val shortName: String = "json"
 
-  override def shortName(): String = "json"
+  override def isSplitable(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  path: Path): Boolean = {
+val parsedOptions = new JSONOptions(
+  options,
+  sparkSession.sessionState.conf.sessionLocalTimeZone,
+  sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val jsonDataSource = JsonDataSource(parsedOptions)
+jsonDataSource.isSplitable && super.isSplitable(sparkSession, options, 
path)
+  }
 
   override def inferSchema(
   sparkSession: SparkSession,
   options: Map[String, String],
   files: Seq[FileStatus]): Option[StructType] = {
-if (files.isEmpty) {
-  None
-} else {
-  val parsedOptions: JSONOptions =
-new JSONOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
-  val columnNameOfCorruptRecord =
-parsedOptions.columnNameOfCorruptRecord
-  
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
-  val jsonSchema = JsonInferSchema.infer(
-createBaseRdd(sparkSession, files),
-columnNameOfCorruptRecord,
-parsedOptions)
-  checkConstraints(jsonSchema)
-
-  Some(jsonSchema)
-}
+val parsedOptions = new JSONOptions(
+  options,
+  sparkSession.sessionState.conf.sessionLocalTimeZone,
+  sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+JsonDataSource(parsedOptions).infer(
+  sparkSession, files, parsedOptions)
--- End diff --

we can merge it into the previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101684365
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write.json(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
--- End diff --

I have a more robust solution:
```
additionalCorruptRecords.collect().zipWithIndex.foreach {
  case (str, index) =>
Seq(str).toDF("value").write.text(new File(dir, index + 
".json").getCanonicalPath)
}
... // dir will have 5 files: 0.json, 1.json, ... 4.json
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101684508
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
--- End diff --

why we have an extra line endings in `_corrupt_record`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101683666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+  corruptFieldIndex.foreach(idx => require(schema(idx).dataType == 
StringType))
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
--- End diff --

nit: unnecessary line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101684686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+  val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => "\"" + x + "\""
+  }.mkString(", ")
+  throw new AnalysisException(s"Duplicate column(s) : 
$duplicateColumns found, " +
+s"cannot save to JSON format")
+}
+  }
+}
+
+object JsonDataSource {
+  def apply(options: JSONOptions): JsonDataSource[_] = {
+if (options.wholeFile) {
+  WholeFileJsonDataSource
+} else {
+  TextInputJsonDataSource
+}
+  }
+
+  /**
+   * Create a new [[RDD]] via the supplied callback if there is at least 
one file to process,
+   * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned.
+   */
+  def createBaseRdd[T : ClassTag](
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus])(
+  fn: (Configuration, String) => RDD[T]): RDD[T] = {
+val paths 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101685374
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.option("compression", "gZiP")
+.json(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write.json(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val originalData = spark.read.json(primitiveFieldAndType)
+  checkAnswer(jsonDF, originalData)
+  checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), 
originalData)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
--- End diff --

The name is misleading, we do have a good record in this dataset, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101671453
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,117 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

I rewrote these tests. Please take a look @gatorsmile and @cloud-fan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101460558
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1802,4 +1806,118 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  assert(new 
File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
--- End diff --

Actually, it only covers three columns.
```
root
 |-- bigInteger: decimal(20,0) (nullable = true)
 |-- boolean: boolean (nullable = true)
 |-- double: double (nullable = true)
 |-- integer: long (nullable = true)
 |-- long: long (nullable = true)
 |-- null: string (nullable = true)
 |-- string: string (nullable = true)

root
 |-- bigInteger: decimal(20,0) (nullable = true)
 |-- boolean: boolean (nullable = true)
 |-- double: double (nullable = true)
 |-- integer: long (nullable = true)
 |-- long: long (nullable = true)
 |-- string: string (nullable = true)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101460391
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,117 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

I have the same concern. We need to check whether the read data are 
expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101453945
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -151,6 +153,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
* 
* `maxFilesPerTrigger` (default: no max limit): sets the maximum 
number of new files to be
* considered in every trigger.
+   * `wholeFile` (default `false`): parse one record, which may span 
multiple lines,
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101453441
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -332,20 +336,21 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* @since 1.4.0
*/
   def json(jsonRDD: RDD[String]): DataFrame = {
-val parsedOptions: JSONOptions =
-  new JSONOptions(extraOptions.toMap, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
-val columnNameOfCorruptRecord =
-  parsedOptions.columnNameOfCorruptRecord
-
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val parsedOptions = new JSONOptions(extraOptions.toMap,
--- End diff --

nit: the style should be
```
new XXX(
  para1,
  para2,
  para3)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101453350
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -261,14 +261,18 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   }
 
   /**
-   * Loads a JSON file (http://jsonlines.org/;>JSON Lines text 
format or
-   * newline-delimited JSON) and returns the result as a `DataFrame`.
+   * Loads a JSON file and returns the results as a `DataFrame`.
+   *
+   * Both JSON (one record per file) and http://jsonlines.org/;>JSON Lines
+   * (newline-delimited JSON) are supported and can be selected with the 
`wholeFile` option.
*
* This function goes through the input once to determine the input 
schema. If you know the
* schema in advance, use the version that specifies the schema to avoid 
the extra scan.
*
* You can set the following JSON-specific options to deal with 
non-standard JSON files:
* 
+   * `wholeFile` (default `false`): parse one record, which may span 
multiple lines,
--- End diff --

please move it to the end


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101451711
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -442,6 +444,8 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
 :param path: string represents path to the JSON dataset,
  or RDD of Strings storing JSON objects.
 :param schema: an optional :class:`pyspark.sql.types.StructType` 
for the input schema.
+:param wholeFile: parse one record, which may span multiple lines, 
per file. If None is
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101451677
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -159,18 +159,21 @@ def json(self, path, schema=None, 
primitivesAsString=None, prefersDecimal=None,
  allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
  allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
  mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
- timeZone=None):
+ timeZone=None, wholeFile=None):
 """
-Loads a JSON file (`JSON Lines text format or newline-delimited 
JSON
-`_) or an RDD of Strings storing JSON 
objects (one object per
-record) and returns the result as a :class`DataFrame`.
+Loads a JSON file and returns the results as a :class:`DataFrame`.
+
+Both JSON (one record per file) and `JSON Lines 
`_
+(newline-delimited JSON) are supported and can be selected with 
the `wholeFile` parameter.
 
 If the ``schema`` parameter is not specified, this function goes
 through the input once to determine the input schema.
 
 :param path: string represents path to the JSON dataset,
  or RDD of Strings storing JSON objects.
 :param schema: an optional :class:`pyspark.sql.types.StructType` 
for the input schema.
+:param wholeFile: parse one record, which may span multiple lines, 
per file. If None is
--- End diff --

the parameters docs come with the same order of the parameter list, let's 
move the `wholeFile` doc to the end


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-12 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100687458
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+  corruptFieldIndex.foreach(idx => require(schema(idx).dataType == 
StringType))
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100679183
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,117 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

I mean we should check answer with `jsonDF`, instead of writing `jsonDF` 
out and read it back as `jsonCopy` and test the result.

The problem is, if `jsonDF` is already wrong, then it's meaningless to 
write it out and read it back, which can still pass the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100679101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

looks like what we really need is a function to turn `record: T` into 
`UTF8String`, how about
```
def parse(record: T, ..., toString: T => UTF8String = r => 
UTF8String.fromString(r.toString)) = {
  ...
  failedRecord(toString(record))
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100678935
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+  corruptFieldIndex.foreach(idx => require(schema(idx).dataType == 
StringType))
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653879
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653757
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
--- End diff --

This is the same as `F.trim` but it works on all whitespace characters, not 
just 0x20 (ascii space)... if trim removed line endings and not just spaces it 
would have worked instead.


---
If your project is set up for it, you 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653580
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Yep, I'll fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
--- End diff --

IIRC this was a check added because some of the backends (maybe parquet?) 
were writing corrupt files... if this is checked globally now it should be fine 
to remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652259
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
--- End diff --

Right, it's a single value that spans multiple lines. The Python test is 
reusing some Python specific test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652192
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
--- End diff --

I'm not sure what you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
--- End diff --

Right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651910
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
--- End diff --

For a reason that is no longer relevant, I'll switch this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -79,7 +80,7 @@ private[sql] object JsonInferSchema {
 
   private[this] val structFieldComparator = new Comparator[StructField] {
 override def compare(o1: StructField, o2: StructField): Int = {
-  o1.name.compare(o2.name)
+  o1.name.compareTo(o2.name)
--- End diff --

`.compare` is a very expensive way of comparing two strings.

`compare`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: new   #14 // class 
scala/collection/immutable/StringOps
   3: dup
   4: getstatic #20 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
   7: aload_1
   8: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  11: invokevirtual #30 // Method 
scala/Predef$.augmentString:(Ljava/lang/String;)Ljava/lang/String;
  14: invokespecial #34 // Method 
scala/collection/immutable/StringOps."":(Ljava/lang/String;)V
  17: aload_2
  18: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  21: invokevirtual #37 // Method 
scala/collection/immutable/StringOps.compare:(Ljava/lang/String;)I
  24: ireturn
```

`compareTo`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: aload_1
   1: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   4: aload_2
   5: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   8: invokevirtual #24 // Method 
java/lang/String.compareTo:(Ljava/lang/String;)I
  11: ireturn
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651385
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
--- End diff --

why we need to remove all white spaces in `_corrupt_record`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651178
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

do you wanna the 5 rows be distributed to 5 files? how about 
`repartitionBy("value")`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651103
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

shall we test all the parse modes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651055
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

shall we test all the parse modes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650756
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
--- End diff --

I'd like to write the whole-file json, then read it and check the answer, 
instead of writing and reading it again, which is too complicated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650577
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
--- End diff --

we don't support `wholeFile` at write side, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650500
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
--- End diff --

I checked with `primitiveFieldAndType`, it's not a json array but is 
wrapped with `{ }`. This is different with the [python 
test](https://github.com/apache/spark/pull/16386/files#diff-e8e190e27ba3aee32a59b787696b34c6R1)
 . So if the whole file is not a json array, this file will only produce a 
single row, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650450
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

Passing a function in instead of a closure saves an allocation that will be 
held for the duration of parsing, and is likely to be promoted to a later GC 
generation.

If we went the closure route the function signature should be this:

```scala
def parse(
  createParser: JsonFactory => JsonParser,
  recordLiteral: => UTF8String): Seq[InternalRow]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650024
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
--- End diff --

why add this pattern match? is it same with `assert(new 
File(path).listFiles().exists(_.getName.endsWith(".gz")))`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649847
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -79,7 +80,7 @@ private[sql] object JsonInferSchema {
 
   private[this] val structFieldComparator = new Comparator[StructField] {
 override def compare(o1: StructField, o2: StructField): Int = {
-  o1.name.compare(o2.name)
+  o1.name.compareTo(o2.name)
--- End diff --

why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649748
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -97,46 +91,13 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 val broadcastedHadoopConf =
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-val parsedOptions: JSONOptions = new JSONOptions(options)
-val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
-  .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val parsedOptions = new JSONOptions(options,
+  sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val readFile = JsonDataSource(parsedOptions).readFile _
--- End diff --

this closure has the reference of the outer pointer, so we will still 
broadcast the `JsonDataSource`. how about
```
val columnNameOfCorruptRecord = 
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

(file: PartitionedFile) => {
  val parsedOptions  =...
  val parser = new JacksonParser(requiredSchema, parsedOptions)
  JsonDataSource(parsedOptions).readFile...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = this(new 
CaseInsensitiveMap(parameters))
+  def this(
+  parameters: Map[String, String],
+  defaultColumnNameOfCorruptRecord: String = "") = {
--- End diff --

Yes, it's really not a good solution, but it doesn't make sense to have a 
corrupt column name in all use cases. Picking another sentinel could 
inadvertently conflict with a real column. It should be `Option[String] = None` 
but this winds up being a large change that deserves a separate pull request. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649228
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

seems the only caller is 
https://github.com/apache/spark/pull/16386/files#diff-5ac20b8d75a20117deaa9ba4af814090R211
 , while doesn't take parameter, so `PartialFunction` is not a good choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

how about `wholeTextRecord: => Option[UTF8String] = None`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648863
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
--- End diff --

nit: I think the brackets are not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
+failedRecord(() => recordLiteral.applyOrElse[T, UTF8String](
--- End diff --

When I do that I usually get review comments to make call by name 
parameters explicit...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648519
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
--- End diff --

not related to this PR, but I don't think this logic belongs to a specific 
file format, it should be handle in a higher-level(I think we already did this 
check at catalog level)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648210
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
+failedRecord(() => recordLiteral.applyOrElse[T, UTF8String](
--- End diff --

this looks weird, how about we define `def failedRecord(record: => 
UTF8String)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648139
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
+  }
+
+  @transient
+  private var printWarningIfWholeFile: () => Unit = { () =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+
+printWarningIfWholeFile = () => ()
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100647494
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100647047
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = this(new 
CaseInsensitiveMap(parameters))
+  def this(
+  parameters: Map[String, String],
+  defaultColumnNameOfCorruptRecord: String = "") = {
--- End diff --

using an empty string as default value looks weird, can we force the caller 
side to give a value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100646497
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100644275
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100640620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100612936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100610662
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100587425
  
--- Diff: core/src/main/scala/org/apache/spark/util/ExecuteOnce.scala ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.atomic.AtomicReference
+
+private[spark] object ExecuteOnce {
+  private val doNothing: Any => Unit = Function.const(())
+
+  def apply[T](fn: T => Unit): ExecuteOnce[T] = {
+new ExecuteOnce(fn)
+  }
+}
+
+/**
+ * Execute the provided function on the first application only.
+ * Subsequent applications will result in a no-op.
+ */
+private[spark] final class ExecuteOnce[T](fn: T => Unit)
+  extends (T => Unit)
+  with Serializable {
+
+  // a quick to test boolean
+  private var execute = true
--- End diff --

nit: `private[this]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100586852
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100474809
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
--- End diff --

@cloud-fan please take a look at aafe7bded6e614dddaed74d15cecfb8b1a78a639, 
I hope this is more clear


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100344879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
--- End diff --

I can wrap it up into a helper class, this is to avoid having to keep 
multiple variables in sync.. `wholeFile` adds another warning, and I'm sure 
there will be additional warnings added in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100344282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
+  }
+
+  @transient
+  private var printWarningIfWholeFile: () => Unit = { () =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+
+printWarningIfWholeFile = () => ()
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100332529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -298,22 +312,22 @@ class JacksonParser(
 // Here, we pass empty `PartialFunction` so that this case can be
 // handled as a failed conversion. It will throw an exception as
 // long as the value is not null.
-parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, 
Any])
+parseJsonToken[AnyRef](parser, 
dataType)(PartialFunction.empty[JsonToken, AnyRef])
   }
 
   /**
* This method skips `FIELD_NAME`s at the beginning, and handles nulls 
ahead before trying
* to parse the JSON token using given function `f`. If the `f` failed 
to parse and convert the
* token, call `failedConversion` to handle the token.
*/
-  private def parseJsonToken(
+  private def parseJsonToken[R >: Null](
--- End diff --

Yes, I said +1 because it explicitly expresses it should be nullable and I 
_assumed_ (because I did not check the byte codes by myself and I might be 
wrong) that it gives a hint to compiler because `Null` is `null`able (I 
remember I googled and played some references for whole several days before 
when I was investigating another null-related PR).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100294813
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
+  }
+
+  @transient
+  private var printWarningIfWholeFile: () => Unit = { () =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+
+printWarningIfWholeFile = () => ()
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100294563
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
--- End diff --

this looks tricky, can we still use the 
`isWarningPrintedForMalformedRecord` flag?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100294186
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
+  }
+
+  @transient
+  private var printWarningIfWholeFile: () => Unit = { () =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+
+printWarningIfWholeFile = () => ()
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100219153
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

Done, pushed in f71a465cf07fb9c043b2ccd86fa57e8e8ea9dc00


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100217653
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

SGTM, can you take a look at other public methods in this class and add 
since tag for them? or it looks weird that only one method has since tag...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100104738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -298,22 +312,22 @@ class JacksonParser(
 // Here, we pass empty `PartialFunction` so that this case can be
 // handled as a failed conversion. It will throw an exception as
 // long as the value is not null.
-parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, 
Any])
+parseJsonToken[AnyRef](parser, 
dataType)(PartialFunction.empty[JsonToken, AnyRef])
   }
 
   /**
* This method skips `FIELD_NAME`s at the beginning, and handles nulls 
ahead before trying
* to parse the JSON token using given function `f`. If the `f` failed 
to parse and convert the
* token, call `failedConversion` to handle the token.
*/
-  private def parseJsonToken(
+  private def parseJsonToken[R >: Null](
--- End diff --

It states that `R` must be a nullable type. This enables `null: R` to 
compile and is preferable to the runtime cast `null.asInstanceOf[R]` because it 
is verified at compile time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100103739
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -227,66 +267,71 @@ class JacksonParser(
   }
 
 case TimestampType =>
-  (parser: JsonParser) => parseJsonToken(parser, dataType) {
+  (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, 
dataType) {
 case VALUE_STRING =>
+  val stringValue = parser.getText
   // This one will lose microseconds parts.
   // See https://issues.apache.org/jira/browse/SPARK-10681.
-  Try(options.timestampFormat.parse(parser.getText).getTime * 
1000L)
-.getOrElse {
-  // If it fails to parse, then tries the way used in 2.0 and 
1.x for backwards
-  // compatibility.
-  DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
-}
+  Long.box {
--- End diff --

This is needed to satisfy the type checker. The other approach is to 
explicitly specify the type in two locations: 
`Try[java.lang.Long](...).getOrElse[java.lang.Long](...)`. I found explicitly 
boxing to be more readable than the alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100101464
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -160,7 +164,17 @@ public void writeTo(OutputStream out) throws 
IOException {
 throw new ArrayIndexOutOfBoundsException();
   }
 
-  out.write(bytes, (int) arrayOffset, numBytes);
+  return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
+} else {
+  return null;
--- End diff --

It will allocate an extra object but would simplify the calling code... 
since it would be a short lived allocation it's probably fine to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100100641
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

This is a public class so I thought adding a `since` tag would benefit the 
documentation. If it's not desired I can certainly remove it.

As for making the lazy val public vs private: I'm following the style used 
already in the class. There are public get methods for each private field. I'm 
not partial to either approach but prefer to keep it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100099791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
--- End diff --

Previously the `JSONOptions` instance was always passed around with a 
`columnNameOfCorruptRecord` value. This just makes it a field in `JSONOptions` 
instead to put all options in one place. Since it's a required option it made 
more sense to use a field instead making an entry in the `CaseInsensitiveMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100098008
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,125 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.flatMap(Iterator.fill(3)(_) ++ Iterator("\n{invalid}"))
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100097749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = InferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+  val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => "\"" + x + "\""
+  }.mkString(", ")
+  throw new AnalysisException(s"Duplicate column(s) : 
$duplicateColumns found, " +
+s"cannot save to JSON format")
+}
+  }
+}
+
+object JsonDataSource {
+  def apply(options: JSONOptions): JsonDataSource[_] = {
+if (options.wholeFile) {
+  WholeFileJsonDataSource
+} else {
+  TextInputJsonDataSource
+}
+  }
+
+  /**
+   * Create a new [[RDD]] via the supplied callback if there is at least 
one file to process,
+   * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned.
+   */
+  def createBaseRddConf[T : ClassTag](
--- End diff --

Habit from working with languages that don't support overloading, I'll 
change this


---
If your project is set up for it, you 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100083481
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,125 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.flatMap(Iterator.fill(3)(_) ++ Iterator("\n{invalid}"))
--- End diff --

can we write json string literal to text file? it's hard to understand 
what's going on here...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100082372
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,125 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempDir { dir =>
+  dir.delete()
--- End diff --

looks like you need `withTempPath`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100081170
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = InferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+  val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => "\"" + x + "\""
+  }.mkString(", ")
+  throw new AnalysisException(s"Duplicate column(s) : 
$duplicateColumns found, " +
+s"cannot save to JSON format")
+}
+  }
+}
+
+object JsonDataSource {
+  def apply(options: JSONOptions): JsonDataSource[_] = {
+if (options.wholeFile) {
+  WholeFileJsonDataSource
+} else {
+  TextInputJsonDataSource
+}
+  }
+
+  /**
+   * Create a new [[RDD]] via the supplied callback if there is at least 
one file to process,
+   * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned.
+   */
+  def createBaseRddConf[T : ClassTag](
--- End diff --

why call it `createBaseRddConf` instead of `createBaseRdd`?


---
If your project is set up for it, you can reply to this email and 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100064532
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -298,22 +312,22 @@ class JacksonParser(
 // Here, we pass empty `PartialFunction` so that this case can be
 // handled as a failed conversion. It will throw an exception as
 // long as the value is not null.
-parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, 
Any])
+parseJsonToken[AnyRef](parser, 
dataType)(PartialFunction.empty[JsonToken, AnyRef])
   }
 
   /**
* This method skips `FIELD_NAME`s at the beginning, and handles nulls 
ahead before trying
* to parse the JSON token using given function `f`. If the `f` failed 
to parse and convert the
* token, call `failedConversion` to handle the token.
*/
-  private def parseJsonToken(
+  private def parseJsonToken[R >: Null](
--- End diff --

what does `>: Null` mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100064266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -227,66 +267,71 @@ class JacksonParser(
   }
 
 case TimestampType =>
-  (parser: JsonParser) => parseJsonToken(parser, dataType) {
+  (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, 
dataType) {
 case VALUE_STRING =>
+  val stringValue = parser.getText
   // This one will lose microseconds parts.
   // See https://issues.apache.org/jira/browse/SPARK-10681.
-  Try(options.timestampFormat.parse(parser.getText).getTime * 
1000L)
-.getOrElse {
-  // If it fails to parse, then tries the way used in 2.0 and 
1.x for backwards
-  // compatibility.
-  DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
-}
+  Long.box {
--- End diff --

I don't think this makes the code more readable...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100063010
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -160,7 +164,17 @@ public void writeTo(OutputStream out) throws 
IOException {
 throw new ArrayIndexOutOfBoundsException();
   }
 
-  out.write(bytes, (int) arrayOffset, numBytes);
+  return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
+} else {
+  return null;
--- End diff --

will it be more consistent if we return `ByteBuffer.wrap(getBytes)` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r99535380
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
--- End diff --

why we need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r99534380
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

can we just make `lazy val conf` not private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r99534181
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

there is no `since` tag in other methods of this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93970059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -36,29 +31,31 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+object JsonFileFormat {
+  def parseJsonOptions(sparkSession: SparkSession, options: Map[String, 
String]): JSONOptions = {
--- End diff --

I just removed the method entirely since all it did was fetch the value of 
`columnNameOfCorruptRecord`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93969732
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+val jsonSchema = InferSchema.infer(
+  createBaseRdd(sparkSession, inputPaths),
+  parsedOptions,
+  createParser)
+checkConstraints(jsonSchema)
+
+if (jsonSchema.fields.nonEmpty) {
--- End diff --

Yes, it was a regression that caused a test failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93733483
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -36,29 +31,31 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+object JsonFileFormat {
+  def parseJsonOptions(sparkSession: SparkSession, options: Map[String, 
String]): JSONOptions = {
--- End diff --

I think I disagree with passing whole `SparkSession` because apparently we 
only need `SQLConf` or the value of `spark.sql.columnNameOfCorruptRecord`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93732800
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+val jsonSchema = InferSchema.infer(
+  createBaseRdd(sparkSession, inputPaths),
+  parsedOptions,
+  createParser)
+checkConstraints(jsonSchema)
+
+if (jsonSchema.fields.nonEmpty) {
--- End diff --

It seems this changes existing behaviour (not allowing empty schema).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-22 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93731259
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -155,21 +155,24 @@ def load(self, path=None, format=None, schema=None, 
**options):
 return self._df(self._jreader.load())
 
 @since(1.4)
-def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
+def json(self, path, schema=None, wholeFile=None, 
primitivesAsString=None, prefersDecimal=None,
--- End diff --

we need to add this to the end; otherwise it breaks compatibility for 
positional arguments.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-22 Thread NathanHowell
GitHub user NathanHowell opened a pull request:

https://github.com/apache/spark/pull/16386

[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse 
each file (instead of a single line) as a value. This is done with Jackson 
streaming and it should be capable of parsing very large documents, assuming 
the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is 
also slightly different when `wholeFile` is enabled: the corrupt column will 
contain the filename instead of the literal JSON if there is a parsing failure. 
It would be easy to extend this to add the parser location (line, column and 
byte offsets) to the output if desired.

I've also included a few other changes that generate slightly better 
bytecode and (imo) make it more obvious when and where boxing is occurring in 
the parser. These are included as separate commits, let me know if they should 
be flattened into this PR or moved to a new one.

## How was this patch tested?

New and existing unit tests. No performance or load tests have been run.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NathanHowell/spark SPARK-18352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16386


commit 740620210b30ef02e280d161d6b08088d07300fa
Author: Nathan Howell 
Date:   2016-12-22T22:16:49Z

[SPARK-18352][SQL] Support parsing multiline json files

commit 7902255a79fc2581214a09ccd38437cebd19d862
Author: Nathan Howell 
Date:   2016-12-22T00:27:19Z

JacksonParser.parseJsonToken should be explicit about nulls and boxing

commit 149418647c9831e88af866d44d31496940c02162
Author: Nathan Howell 
Date:   2016-12-21T23:49:37Z

Increase type safety of makeRootConverter, remove runtime type tests

commit 7ad5d5be0c7b41112f9f6ad3cb0cf9055de62695
Author: Nathan Howell 
Date:   2016-12-23T02:13:59Z

Field converter lookups should be O(1)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org