Repository: spark Updated Branches: refs/heads/master f58319a24 -> e47408814
[SPARK-13764][SQL] Parse modes in JSON data source ## What changes were proposed in this pull request? Currently, there is no way to control the behaviour when fails to parse corrupt records in JSON data source . This PR adds the support for parse modes just like CSV data source. There are three modes below: - `PERMISSIVE` : When it fails to parse, this sets `null` to to field. This is a default mode when it has been this mode. - `DROPMALFORMED`: When it fails to parse, this drops the whole record. - `FAILFAST`: When it fails to parse, it just throws an exception. This PR also make JSON data source share the `ParseModes` in CSV data source. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for code style tests. Author: hyukjinkwon <gurwls...@gmail.com> Closes #11756 from HyukjinKwon/SPARK-13764. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4740881 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4740881 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4740881 Branch: refs/heads/master Commit: e474088144cdd2632cf2fef6b2cf10b3cd191c23 Parents: f58319a Author: hyukjinkwon <gurwls...@gmail.com> Authored: Mon Mar 21 15:42:35 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Mon Mar 21 15:42:35 2016 +0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 8 +++ .../org/apache/spark/sql/DataFrameReader.scala | 18 +++++++ .../sql/execution/datasources/ParseModes.scala | 41 +++++++++++++++ .../execution/datasources/csv/CSVOptions.scala | 27 +--------- .../datasources/json/InferSchema.scala | 26 ++++++---- .../datasources/json/JSONOptions.scala | 15 +++++- .../datasources/json/JacksonParser.scala | 22 ++++++--- .../execution/datasources/json/JsonSuite.scala | 52 +++++++++++++++++++- 8 files changed, 164 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 438662b..bae9e69 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -162,6 +162,14 @@ class DataFrameReader(object): (e.g. 00012) * ``allowBackslashEscapingAnyCharacter`` (default ``false``): allows accepting quoting \ of all character using backslash quoting mechanism + * ``mode`` (default ``PERMISSIVE``): allows a mode for dealing with corrupt records \ + during parsing. + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record and puts the malformed string into a new field configured by \ + ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ + ``null`` for extra fields. + * ``DROPMALFORMED`` : ignores the whole corrupted records. + * ``FAILFAST`` : throws an exception when it meets corrupted records. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 1b5a499..0dc0d44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -289,6 +289,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * </li> * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers * (e.g. 00012)</li> + * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing.<li> + * <ul> + * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the + * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When + * a schema is set by user, it sets `null` for extra fields.</li> + * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li> + * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li> + * </ul> * * @since 1.4.0 */ @@ -313,6 +322,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * (e.g. 00012)</li> * <li>`allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all * character using backslash quoting mechanism</li> + * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing.<li> + * <ul> + * <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the + * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When + * a schema is set by user, it sets `null` for extra fields.</li> + * <li>`DROPMALFORMED` : ignores the whole corrupted records.</li> + * <li>`FAILFAST` : throws an exception when it meets corrupted records.</li> + * </ul> * * @since 1.6.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala new file mode 100644 index 0000000..4682280 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +private[datasources] object ParseModes { + val PERMISSIVE_MODE = "PERMISSIVE" + val DROP_MALFORMED_MODE = "DROPMALFORMED" + val FAIL_FAST_MODE = "FAILFAST" + + val DEFAULT = PERMISSIVE_MODE + + def isValidMode(mode: String): Boolean = { + mode.toUpperCase match { + case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true + case _ => false + } + } + + def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE + def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE + def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { + mode.toUpperCase == PERMISSIVE_MODE + } else { + true // We default to permissive is the mode string is not valid + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index e009a37..95de02c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.nio.charset.StandardCharsets import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} private[sql] class CSVOptions( @transient private val parameters: Map[String, String]) @@ -62,7 +62,7 @@ private[sql] class CSVOptions( val delimiter = CSVTypeCast.toChar( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val charset = parameters.getOrElse("encoding", parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) @@ -101,26 +101,3 @@ private[sql] class CSVOptions( val rowSeparator = "\n" } - -private[csv] object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" - val DROP_MALFORMED_MODE = "DROPMALFORMED" - val FAIL_FAST_MODE = "FAILFAST" - - val DEFAULT = PERMISSIVE_MODE - - def isValidMode(mode: String): Boolean = { - mode.toUpperCase match { - case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true - case _ => false - } - } - - def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE - def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE - def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { - mode.toUpperCase == PERMISSIVE_MODE - } else { - true // We default to permissive is the mode string is not valid - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 0937a21..945ed2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -40,6 +40,7 @@ private[sql] object InferSchema { configOptions: JSONOptions): StructType = { require(configOptions.samplingRatio > 0, s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") + val shouldHandleCorruptRecord = configOptions.permissive val schemaData = if (configOptions.samplingRatio > 0.99) { json } else { @@ -50,21 +51,23 @@ private[sql] object InferSchema { val rootType = schemaData.mapPartitions { iter => val factory = new JsonFactory() configOptions.setJacksonOptions(factory) - iter.map { row => + iter.flatMap { row => try { Utils.tryWithResource(factory.createParser(row)) { parser => parser.nextToken() - inferField(parser, configOptions) + Some(inferField(parser, configOptions)) } } catch { + case _: JsonParseException if shouldHandleCorruptRecord => + Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))) case _: JsonParseException => - StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) + None } } }.treeAggregate[DataType]( StructType(Seq()))( - compatibleRootType(columnNameOfCorruptRecords), - compatibleRootType(columnNameOfCorruptRecords)) + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord), + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)) canonicalizeType(rootType) match { case Some(st: StructType) => st @@ -194,18 +197,21 @@ private[sql] object InferSchema { * Remove top-level ArrayType wrappers and merge the remaining schemas */ private def compatibleRootType( - columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = { + columnNameOfCorruptRecords: String, + shouldHandleCorruptRecord: Boolean): (DataType, DataType) => DataType = { // Since we support array of json objects at the top level, // we need to check the element type and find the root level data type. - case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) - case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2) + case (ArrayType(ty1, _), ty2) => + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2) + case (ty1, ArrayType(ty2, _)) => + compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord)(ty1, ty2) // If we see any other data type at the root level, we get records that cannot be // parsed. So, we use the struct as the data type and add the corrupt field to the schema. case (struct: StructType, NullType) => struct case (NullType, struct: StructType) => struct - case (struct: StructType, o) if !o.isInstanceOf[StructType] => + case (struct: StructType, o) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord => withCorruptField(struct, columnNameOfCorruptRecords) - case (o, struct: StructType) if !o.isInstanceOf[StructType] => + case (o, struct: StructType) if !o.isInstanceOf[StructType] && shouldHandleCorruptRecord => withCorruptField(struct, columnNameOfCorruptRecords) // If we get anything else, we call compatibleType. // Usually, when we reach here, ty1 and ty2 are two StructTypes. http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index e59dbd6..93c3d47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.spark.sql.execution.datasources.CompressionCodecs +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} /** * Options for the JSON data source. @@ -28,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.CompressionCodecs */ private[sql] class JSONOptions( @transient private val parameters: Map[String, String]) - extends Serializable { + extends Logging with Serializable { val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) @@ -49,6 +50,16 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) + private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + + // Parse mode flags + if (!ParseModes.isValidMode(parseMode)) { + logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") + } + + val failFast = ParseModes.isFailFastMode(parseMode) + val dropMalformed = ParseModes.isDropMalformedMode(parseMode) + val permissive = ParseModes.isPermissiveMode(parseMode) /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 3252b6c..00c14ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.core._ +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -34,7 +35,7 @@ import org.apache.spark.util.Utils private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) -object JacksonParser { +object JacksonParser extends Logging { def parse( input: RDD[String], @@ -257,13 +258,20 @@ object JacksonParser { def failedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present - val row = new GenericMutableRow(schema.length) - for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { - require(schema(corruptIndex).dataType == StringType) - row.update(corruptIndex, UTF8String.fromString(record)) + if (configOptions.failFast) { + throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") + } + if (configOptions.dropMalformed) { + logWarning(s"Dropping malformed line: $record") + Nil + } else { + val row = new GenericMutableRow(schema.length) + for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { + require(schema(corruptIndex).dataType == StringType) + row.update(corruptIndex, UTF8String.fromString(record)) + } + Seq(row) } - - Seq(row) } val factory = new JsonFactory() http://git-wip-us.apache.org/repos/asf/spark/blob/e4740881/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6d942c4..0a5699b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -963,7 +964,56 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { ) } - test("Corrupt records") { + test("Corrupt records: FAILFAST mode") { + val schema = StructType( + StructField("a", StringType, true) :: Nil) + // `FAILFAST` mode should throw an exception for corrupt records. + val exceptionOne = intercept[SparkException] { + sqlContext.read + .option("mode", "FAILFAST") + .json(corruptRecords) + .collect() + } + assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode: {")) + + val exceptionTwo = intercept[SparkException] { + sqlContext.read + .option("mode", "FAILFAST") + .schema(schema) + .json(corruptRecords) + .collect() + } + assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode: {")) + } + + test("Corrupt records: DROPMALFORMED mode") { + val schemaOne = StructType( + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + val schemaTwo = StructType( + StructField("a", StringType, true) :: Nil) + // `DROPMALFORMED` mode should skip corrupt records + val jsonDFOne = sqlContext.read + .option("mode", "DROPMALFORMED") + .json(corruptRecords) + checkAnswer( + jsonDFOne, + Row("str_a_4", "str_b_4", "str_c_4") :: Nil + ) + assert(jsonDFOne.schema === schemaOne) + + val jsonDFTwo = sqlContext.read + .option("mode", "DROPMALFORMED") + .schema(schemaTwo) + .json(corruptRecords) + checkAnswer( + jsonDFTwo, + Row("str_a_4") :: Nil) + assert(jsonDFTwo.schema === schemaTwo) + } + + test("Corrupt records: PERMISSIVE mode") { // Test if we can query corrupt records. withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") { withTempTable("jsonTable") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org