Repository: spark
Updated Branches:
  refs/heads/master f58319a24 -> e47408814


[SPARK-13764][SQL] Parse modes in JSON data source

## What changes were proposed in this pull request?

Currently, there is no way to control the behaviour when fails to parse corrupt 
records in JSON data source .

This PR adds the support for parse modes just like CSV data source. There are 
three modes below:

- `PERMISSIVE` :  When it fails to parse, this sets `null` to to field. This is 
a default mode when it has been this mode.
- `DROPMALFORMED`: When it fails to parse, this drops the whole record.
- `FAILFAST`: When it fails to parse, it just throws an exception.

This PR also make JSON data source share the `ParseModes` in CSV data source.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for code style tests.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #11756 from HyukjinKwon/SPARK-13764.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4740881
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4740881
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4740881

Branch: refs/heads/master
Commit: e474088144cdd2632cf2fef6b2cf10b3cd191c23
Parents: f58319a
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Mon Mar 21 15:42:35 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Mar 21 15:42:35 2016 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  8 +++
 .../org/apache/spark/sql/DataFrameReader.scala  | 18 +++++++
 .../sql/execution/datasources/ParseModes.scala  | 41 +++++++++++++++
 .../execution/datasources/csv/CSVOptions.scala  | 27 +---------
 .../datasources/json/InferSchema.scala          | 26 ++++++----
 .../datasources/json/JSONOptions.scala          | 15 +++++-
 .../datasources/json/JacksonParser.scala        | 22 ++++++---
 .../execution/datasources/json/JsonSuite.scala  | 52 +++++++++++++++++++-
 8 files changed, 164 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 438662b..bae9e69 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -162,6 +162,14 @@ class DataFrameReader(object):
                 (e.g. 00012)
             * ``allowBackslashEscapingAnyCharacter`` (default ``false``): 
allows accepting quoting \
                 of all character using backslash quoting mechanism
+            *  ``mode`` (default ``PERMISSIVE``): allows a mode for dealing 
with corrupt records \
+                during parsing.
+                *  ``PERMISSIVE`` : sets other fields to ``null`` when it 
meets a corrupted \
+                  record and puts the malformed string into a new field 
configured by \
+                 ``spark.sql.columnNameOfCorruptRecord``. When a schema is set 
by user, it sets \
+                 ``null`` for extra fields.
+                *  ``DROPMALFORMED`` : ignores the whole corrupted records.
+                *  ``FAILFAST`` : throws an exception when it meets corrupted 
records.
 
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1b5a499..0dc0d44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -289,6 +289,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
    * </li>
    * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in 
numbers
    * (e.g. 00012)</li>
+   * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
+   * during parsing.<li>
+   * <ul>
+   *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted 
record, and puts the
+   *  malformed string into a new field configured by 
`spark.sql.columnNameOfCorruptRecord`. When
+   *  a schema is set by user, it sets `null` for extra fields.</li>
+   *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+   *  <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
+   * </ul>
    *
    * @since 1.4.0
    */
@@ -313,6 +322,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) 
extends Logging {
    * (e.g. 00012)</li>
    * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows 
accepting quoting of all
    * character using backslash quoting mechanism</li>
+   * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt 
records
+   * during parsing.<li>
+   * <ul>
+   *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted 
record, and puts the
+   *  malformed string into a new field configured by 
`spark.sql.columnNameOfCorruptRecord`. When
+   *  a schema is set by user, it sets `null` for extra fields.</li>
+   *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
+   *  <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
+   * </ul>
    *
    * @since 1.6.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
new file mode 100644
index 0000000..4682280
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+private[datasources] object ParseModes {
+  val PERMISSIVE_MODE = "PERMISSIVE"
+  val DROP_MALFORMED_MODE = "DROPMALFORMED"
+  val FAIL_FAST_MODE = "FAILFAST"
+
+  val DEFAULT = PERMISSIVE_MODE
+
+  def isValidMode(mode: String): Boolean = {
+    mode.toUpperCase match {
+      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
+      case _ => false
+    }
+  }
+
+  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == 
DROP_MALFORMED_MODE
+  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == 
FAIL_FAST_MODE
+  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
+    mode.toUpperCase == PERMISSIVE_MODE
+  } else {
+    true // We default to permissive is the mode string is not valid
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e009a37..95de02c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv
 import java.nio.charset.StandardCharsets
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, 
ParseModes}
 
 private[sql] class CSVOptions(
     @transient private val parameters: Map[String, String])
@@ -62,7 +62,7 @@ private[sql] class CSVOptions(
 
   val delimiter = CSVTypeCast.toChar(
     parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
-  val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
   val charset = parameters.getOrElse("encoding",
     parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))
 
@@ -101,26 +101,3 @@ private[sql] class CSVOptions(
 
   val rowSeparator = "\n"
 }
-
-private[csv] object ParseModes {
-  val PERMISSIVE_MODE = "PERMISSIVE"
-  val DROP_MALFORMED_MODE = "DROPMALFORMED"
-  val FAIL_FAST_MODE = "FAILFAST"
-
-  val DEFAULT = PERMISSIVE_MODE
-
-  def isValidMode(mode: String): Boolean = {
-    mode.toUpperCase match {
-      case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
-      case _ => false
-    }
-  }
-
-  def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == 
DROP_MALFORMED_MODE
-  def isFailFastMode(mode: String): Boolean = mode.toUpperCase == 
FAIL_FAST_MODE
-  def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode))  {
-    mode.toUpperCase == PERMISSIVE_MODE
-  } else {
-    true // We default to permissive is the mode string is not valid
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 0937a21..945ed2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -40,6 +40,7 @@ private[sql] object InferSchema {
       configOptions: JSONOptions): StructType = {
     require(configOptions.samplingRatio > 0,
       s"samplingRatio (${configOptions.samplingRatio}) should be greater than 
0")
+    val shouldHandleCorruptRecord = configOptions.permissive
     val schemaData = if (configOptions.samplingRatio > 0.99) {
       json
     } else {
@@ -50,21 +51,23 @@ private[sql] object InferSchema {
     val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
       configOptions.setJacksonOptions(factory)
-      iter.map { row =>
+      iter.flatMap { row =>
         try {
           Utils.tryWithResource(factory.createParser(row)) { parser =>
             parser.nextToken()
-            inferField(parser, configOptions)
+            Some(inferField(parser, configOptions))
           }
         } catch {
+          case _: JsonParseException if shouldHandleCorruptRecord =>
+            Some(StructType(Seq(StructField(columnNameOfCorruptRecords, 
StringType))))
           case _: JsonParseException =>
-            StructType(Seq(StructField(columnNameOfCorruptRecords, 
StringType)))
+            None
         }
       }
     }.treeAggregate[DataType](
       StructType(Seq()))(
-      compatibleRootType(columnNameOfCorruptRecords),
-      compatibleRootType(columnNameOfCorruptRecords))
+      compatibleRootType(columnNameOfCorruptRecords, 
shouldHandleCorruptRecord),
+      compatibleRootType(columnNameOfCorruptRecords, 
shouldHandleCorruptRecord))
 
     canonicalizeType(rootType) match {
       case Some(st: StructType) => st
@@ -194,18 +197,21 @@ private[sql] object InferSchema {
    * Remove top-level ArrayType wrappers and merge the remaining schemas
    */
   private def compatibleRootType(
-      columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+      columnNameOfCorruptRecords: String,
+      shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = {
     // Since we support array of json objects at the top level,
     // we need to check the element type and find the root level data type.
-    case (ArrayType(ty1, _), ty2) => 
compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
-    case (ty1, ArrayType(ty2, _)) => 
compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+    case (ArrayType(ty1, _), ty2) =>
+      compatibleRootType(columnNameOfCorruptRecords, 
shouldHandleCorruptRecord)(ty1, ty2)
+    case (ty1, ArrayType(ty2, _)) =>
+      compatibleRootType(columnNameOfCorruptRecords, 
shouldHandleCorruptRecord)(ty1, ty2)
     // If we see any other data type at the root level, we get records that 
cannot be
     // parsed. So, we use the struct as the data type and add the corrupt 
field to the schema.
     case (struct: StructType, NullType) => struct
     case (NullType, struct: StructType) => struct
-    case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+    case (struct: StructType, o) if !o.isInstanceOf[StructType] && 
shouldHandleCorruptRecord =>
       withCorruptField(struct, columnNameOfCorruptRecords)
-    case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+    case (o, struct: StructType) if !o.isInstanceOf[StructType] && 
shouldHandleCorruptRecord =>
       withCorruptField(struct, columnNameOfCorruptRecords)
     // If we get anything else, we call compatibleType.
     // Usually, when we reach here, ty1 and ty2 are two StructTypes.

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index e59dbd6..93c3d47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, 
ParseModes}
 
 /**
  * Options for the JSON data source.
@@ -28,7 +29,7 @@ import 
org.apache.spark.sql.execution.datasources.CompressionCodecs
  */
 private[sql] class JSONOptions(
     @transient private val parameters: Map[String, String])
-  extends Serializable  {
+  extends Logging with Serializable  {
 
   val samplingRatio =
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -49,6 +50,16 @@ private[sql] class JSONOptions(
   val allowBackslashEscapingAnyCharacter =
     
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
   val compressionCodec = 
parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+  private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+
+  // Parse mode flags
+  if (!ParseModes.isValidMode(parseMode)) {
+    logWarning(s"$parseMode is not a valid parse mode. Using 
${ParseModes.DEFAULT}.")
+  }
+
+  val failFast = ParseModes.isFailFastMode(parseMode)
+  val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+  val permissive = ParseModes.isPermissiveMode(parseMode)
 
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 3252b6c..00c14ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.core._
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -34,7 +35,7 @@ import org.apache.spark.util.Utils
 
 private[json] class SparkSQLJsonProcessingException(msg: String) extends 
RuntimeException(msg)
 
-object JacksonParser {
+object JacksonParser extends Logging {
 
   def parse(
       input: RDD[String],
@@ -257,13 +258,20 @@ object JacksonParser {
 
     def failedRecord(record: String): Seq[InternalRow] = {
       // create a row even if no corrupt record column is present
-      val row = new GenericMutableRow(schema.length)
-      for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
-        require(schema(corruptIndex).dataType == StringType)
-        row.update(corruptIndex, UTF8String.fromString(record))
+      if (configOptions.failFast) {
+        throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+      }
+      if (configOptions.dropMalformed) {
+        logWarning(s"Dropping malformed line: $record")
+        Nil
+      } else {
+        val row = new GenericMutableRow(schema.length)
+        for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) 
{
+          require(schema(corruptIndex).dataType == StringType)
+          row.update(corruptIndex, UTF8String.fromString(record))
+        }
+        Seq(row)
       }
-
-      Seq(row)
     }
 
     val factory = new JsonFactory()

http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6d942c4..0a5699b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -963,7 +964,56 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     )
   }
 
-  test("Corrupt records") {
+  test("Corrupt records: FAILFAST mode") {
+    val schema = StructType(
+        StructField("a", StringType, true) :: Nil)
+    // `FAILFAST` mode should throw an exception for corrupt records.
+    val exceptionOne = intercept[SparkException] {
+      sqlContext.read
+        .option("mode", "FAILFAST")
+        .json(corruptRecords)
+        .collect()
+    }
+    assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: 
{"))
+
+    val exceptionTwo = intercept[SparkException] {
+      sqlContext.read
+        .option("mode", "FAILFAST")
+        .schema(schema)
+        .json(corruptRecords)
+        .collect()
+    }
+    assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: 
{"))
+  }
+
+  test("Corrupt records: DROPMALFORMED mode") {
+    val schemaOne = StructType(
+      StructField("a", StringType, true) ::
+        StructField("b", StringType, true) ::
+        StructField("c", StringType, true) :: Nil)
+    val schemaTwo = StructType(
+      StructField("a", StringType, true) :: Nil)
+    // `DROPMALFORMED` mode should skip corrupt records
+    val jsonDFOne = sqlContext.read
+      .option("mode", "DROPMALFORMED")
+      .json(corruptRecords)
+    checkAnswer(
+      jsonDFOne,
+      Row("str_a_4", "str_b_4", "str_c_4") :: Nil
+    )
+    assert(jsonDFOne.schema === schemaOne)
+
+    val jsonDFTwo = sqlContext.read
+      .option("mode", "DROPMALFORMED")
+      .schema(schemaTwo)
+      .json(corruptRecords)
+    checkAnswer(
+      jsonDFTwo,
+      Row("str_a_4") :: Nil)
+    assert(jsonDFTwo.schema === schemaTwo)
+  }
+
+  test("Corrupt records: PERMISSIVE mode") {
     // Test if we can query corrupt records.
     withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
       withTempTable("jsonTable") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to