Repository: spark Updated Branches: refs/heads/master f7b55dbfc -> 7a9dcbc91
[SPARK-10441] [SQL] Save data correctly to json. https://issues.apache.org/jira/browse/SPARK-10441 Author: Yin Huai <yh...@databricks.com> Closes #8597 from yhuai/timestampJson. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a9dcbc9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a9dcbc9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a9dcbc9 Branch: refs/heads/master Commit: 7a9dcbc91d55dbc0cbf4812319bde65f4509b467 Parents: f7b55db Author: Yin Huai <yh...@databricks.com> Authored: Tue Sep 8 14:10:12 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Sep 8 14:10:12 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/RandomDataGenerator.scala | 41 +++++++++- .../datasources/json/JacksonGenerator.scala | 11 ++- .../datasources/json/JacksonParser.scala | 31 ++++++++ .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 8 ++ .../sql/sources/JsonHadoopFsRelationSuite.scala | 8 ++ .../sources/ParquetHadoopFsRelationSuite.scala | 9 ++- .../SimpleTextHadoopFsRelationSuite.scala | 19 ++++- .../spark/sql/sources/SimpleTextRelation.scala | 7 +- .../sql/sources/hadoopFsRelationSuites.scala | 79 ++++++++++++++++++++ 9 files changed, 205 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 11e0c12..4025cbc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -23,6 +23,8 @@ import java.math.MathContext import scala.util.Random +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -84,6 +86,7 @@ object RandomDataGenerator { * random data generator is defined for that data type. The generated values will use an external * representation of the data type; for example, the random generator for [[DateType]] will return * instances of [[java.sql.Date]] and the generator for [[StructType]] will return a [[Row]]. + * For a [[UserDefinedType]] for a class X, an instance of class X is returned. * * @param dataType the type to generate values for * @param nullable whether null values should be generated @@ -106,7 +109,22 @@ object RandomDataGenerator { }) case BooleanType => Some(() => rand.nextBoolean()) case DateType => Some(() => new java.sql.Date(rand.nextInt())) - case TimestampType => Some(() => new java.sql.Timestamp(rand.nextLong())) + case TimestampType => + val generator = + () => { + var milliseconds = rand.nextLong() % 253402329599999L + // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT + // for "0001-01-01 00:00:00.000000". We need to find a + // number that is greater or equals to this number as a valid timestamp value. + while (milliseconds < -62135740800000L) { + // 253402329599999L is the the number of milliseconds since + // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999". + milliseconds = rand.nextLong() % 253402329599999L + } + // DateTimeUtils.toJavaTimestamp takes microsecond. + DateTimeUtils.toJavaTimestamp(milliseconds * 1000) + } + Some(generator) case CalendarIntervalType => Some(() => { val months = rand.nextInt(1000) val ns = rand.nextLong() @@ -159,6 +177,27 @@ object RandomDataGenerator { None } } + case udt: UserDefinedType[_] => { + val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed) + // Because random data generator at here returns scala value, we need to + // convert it to catalyst value to call udt's deserialize. + val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType) + + if (maybeSqlTypeGenerator.isDefined) { + val sqlTypeGenerator = maybeSqlTypeGenerator.get + val generator = () => { + val generatedScalaValue = sqlTypeGenerator.apply() + if (generatedScalaValue == null) { + null + } else { + udt.deserialize(toCatalystType(generatedScalaValue)) + } + } + Some(generator) + } else { + None + } + } case unsupportedType => None } // Handle nullability by wrapping the non-null value generator: http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 330ba90..f65c7bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import scala.collection.Map @@ -89,7 +90,7 @@ private[sql] object JacksonGenerator { def valWriter: (DataType, Any) => Unit = { case (_, null) | (NullType, _) => gen.writeNull() case (StringType, v) => gen.writeString(v.toString) - case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString) + case (TimestampType, v: Long) => gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString) case (IntegerType, v: Int) => gen.writeNumber(v) case (ShortType, v: Short) => gen.writeNumber(v) case (FloatType, v: Float) => gen.writeNumber(v) @@ -99,8 +100,12 @@ private[sql] object JacksonGenerator { case (ByteType, v: Byte) => gen.writeNumber(v.toInt) case (BinaryType, v: Array[Byte]) => gen.writeBinary(v) case (BooleanType, v: Boolean) => gen.writeBoolean(v) - case (DateType, v) => gen.writeString(v.toString) - case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, udt.serialize(v)) + case (DateType, v: Int) => gen.writeString(DateTimeUtils.toJavaDate(v).toString) + // For UDT values, they should be in the SQL type's corresponding value type. + // We should not see values in the user-defined class at here. + // For example, VectorUDT's SQL type is an array of double. So, we should expect that v is + // an ArrayData at here, instead of a Vector. + case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v) case (ArrayType(ty, _), v: ArrayData) => gen.writeStartArray() http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index cd68bd6..ff4d8c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -81,9 +81,37 @@ private[sql] object JacksonParser { case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => parser.getFloatValue + case (VALUE_STRING, FloatType) => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase() + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toFloat + } else { + sys.error(s"Cannot parse $value as FloatType.") + } + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) => parser.getDoubleValue + case (VALUE_STRING, DoubleType) => + // Special case handling for NaN and Infinity. + val value = parser.getText + val lowerCaseValue = value.toLowerCase() + if (lowerCaseValue.equals("nan") || + lowerCaseValue.equals("infinity") || + lowerCaseValue.equals("-infinity") || + lowerCaseValue.equals("inf") || + lowerCaseValue.equals("-inf")) { + value.toDouble + } else { + sys.error(s"Cannot parse $value as DoubleType.") + } + case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) => Decimal(parser.getDecimalValue, dt.precision, dt.scale) @@ -126,6 +154,9 @@ private[sql] object JacksonParser { case (_, udt: UserDefinedType[_]) => convertField(factory, parser, udt.sqlType) + + case (token, dataType) => + sys.error(s"Failed to parse a value for data type $dataType (current token: $token).") } } http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 9a299c3..92043d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -28,6 +28,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[DefaultSource].getCanonicalName + // ORC does not play well with NullType and UDT. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: CalendarIntervalType => false + case _: UserDefinedType[_] => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index 1945b15..ef37787 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -28,6 +28,14 @@ import org.apache.spark.sql.types._ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = "json" + // JSON does not write data of NullType and does not play well with BinaryType. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: BinaryType => false + case _: CalendarIntervalType => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 08c3c17..e2d754e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{execution, AnalysisException, SaveMode} -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types._ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { @@ -32,6 +32,13 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = "parquet" + // Parquet does not play well with NullType. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: NullType => false + case _: CalendarIntervalType => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index 1125ca6..a3a1244 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -20,11 +20,28 @@ package org.apache.spark.sql.sources import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.types._ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName + // We have a very limited number of supported types at here since it is just for a + // test relation and we do very basic testing at here. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: BinaryType => false + // We are using random data generator and the generated strings are not really valid string. + case _: StringType => false + case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442 + case _: CalendarIntervalType => false + case _: DateType => false + case _: TimestampType => false + case _: ArrayType => false + case _: MapType => false + case _: StructType => false + case _: UserDefinedType[_] => false + case _ => true + } + test("save()/load() - partitioned table - simple queries - partition columns in data") { withTempDir { file => val basePath = new Path(file.getCanonicalPath) http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 527ca7a..aeaaa3e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -68,7 +68,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) override def write(row: Row): Unit = { - val serialized = row.toSeq.map(_.toString).mkString(",") + val serialized = row.toSeq.map { v => + if (v == null) "" else v.toString + }.mkString(",") recordWriter.write(null, new Text(serialized)) } @@ -112,7 +114,8 @@ class SimpleTextRelation( val fields = dataSchema.map(_.dataType) sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record => - Row(record.split(",").zip(fields).map { case (value, dataType) => + Row(record.split(",", -1).zip(fields).map { case (v, dataType) => + val value = if (v == "") null else v // `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.) val catalystValue = Cast(Literal(value), dataType).eval() // Here we're converting Catalyst values to Scala values to test `needsConversion` http://git-wip-us.apache.org/repos/asf/spark/blob/7a9dcbc9/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 2ad2618..24f43cf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -38,6 +38,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val dataSourceName: String + protected def supportsDataType(dataType: DataType): Boolean = true + val dataSchema = StructType( Seq( @@ -98,6 +100,83 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } } + test("test all data types") { + withTempPath { file => + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + // TODO: add CalendarIntervalType to here once we can save it out. + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) + + // Generate data at the driver side. We need to materialize the data first and then + // create RDD. + val maybeDataGenerator = + RandomDataGenerator.forType( + dataType = schema, + nullable = true, + seed = Some(System.nanoTime())) + val dataGenerator = + maybeDataGenerator + .getOrElse(fail(s"Failed to create data generator for schema $schema")) + val data = (1 to 10).map { i => + dataGenerator.apply() match { + case row: Row => row + case null => Row.fromSeq(Seq.fill(schema.length)(null)) + case other => + fail(s"Row or null is expected to be generated, " + + s"but a ${other.getClass.getCanonicalName} is generated.") + } + } + + // Create a DF for the schema with random data. + val rdd = sqlContext.sparkContext.parallelize(data, 10) + val df = sqlContext.createDataFrame(rdd, schema) + + // All columns that have supported data types of this source. + val supportedColumns = schema.fields.collect { + case StructField(name, dataType, _, _) if supportsDataType(dataType) => name + } + val selectedColumns = util.Random.shuffle(supportedColumns.toSeq) + + val dfToBeSaved = df.selectExpr(selectedColumns: _*) + + // Save the data out. + dfToBeSaved + .write + .format(dataSourceName) + .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. + .save(file.getCanonicalPath) + + val loadedDF = + sqlContext + .read + .format(dataSourceName) + .schema(dfToBeSaved.schema) + .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests. + .load(file.getCanonicalPath) + .selectExpr(selectedColumns: _*) + + // Read the data back. + checkAnswer( + loadedDF, + dfToBeSaved + ) + } + } + test("save()/load() - non-partitioned table - Overwrite") { withTempPath { file => testDF.write.mode(SaveMode.Overwrite).format(dataSourceName).save(file.getCanonicalPath) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org