Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/20208#discussion_r162786270
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala
---
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+
+/**
+ * Schema can evolve in several ways and the followings are supported in
file-based data sources.
+ *
+ * 1. Add a column
+ * 2. Remove a column
+ * 3. Change a column position
+ * 4. Change a column type
+ *
+ * Here, we consider safe evolution without data loss. For example, data
type evolution should be
+ * from small types to larger types like `int`-to-`long`, not vice versa.
+ *
+ * So far, file-based data sources have schema evolution coverages like
the followings.
+ *
+ * | File Format | Coverage | Note
|
+ * | ------------ | ------------ |
------------------------------------------------------ |
+ * | TEXT | N/A | Schema consists of a single string
column. |
+ * | CSV | 1, 2, 4 |
|
+ * | JSON | 1, 2, 3, 4 |
|
+ * | ORC | 1, 2, 3, 4 | Native vectorized ORC reader has the
widest coverage. |
+ * | PARQUET | 1, 2, 3 |
|
+ *
+ * This aims to provide an explicit test coverage for schema evolution on
file-based data sources.
+ * Since a file format has its own coverage of schema evolution, we need a
test suite
+ * for each file-based data source with corresponding supported test case
traits.
+ *
+ * The following is a hierarchy of test traits.
+ *
+ * SchemaEvolutionTest
+ * -> AddColumnEvolutionTest
+ * -> RemoveColumnEvolutionTest
+ * -> ChangePositionEvolutionTest
+ * -> BooleanTypeEvolutionTest
+ * -> IntegralTypeEvolutionTest
+ * -> ToDoubleTypeEvolutionTest
+ * -> ToDecimalTypeEvolutionTest
+ */
+
+trait SchemaEvolutionTest extends QueryTest with SQLTestUtils with
SharedSQLContext {
+ val format: String
+ val options: Map[String, String] = Map.empty[String, String]
+}
+
+/**
+ * Add column.
+ * This test suite assumes that the missing column should be `null`.
+ */
+trait AddColumnEvolutionTest extends SchemaEvolutionTest {
+ import testImplicits._
+
+ test("append column at the end") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+
+ val df1 = Seq("a", "b").toDF("col1")
+ val df2 = df1.withColumn("col2", lit("x"))
+ val df3 = df2.withColumn("col3", lit("y"))
+
+ val dir1 = s"$path${File.separator}part=one"
+ val dir2 = s"$path${File.separator}part=two"
+ val dir3 = s"$path${File.separator}part=three"
+
+
df1.write.mode("overwrite").format(format).options(options).save(dir1)
+
df2.write.mode("overwrite").format(format).options(options).save(dir2)
+
df3.write.mode("overwrite").format(format).options(options).save(dir3)
+
+ val df = spark.read
+ .schema(df3.schema)
+ .format(format)
+ .options(options)
+ .load(path)
+
+ checkAnswer(df, Seq(
+ Row("a", null, null, "one"),
+ Row("b", null, null, "one"),
+ Row("a", "x", null, "two"),
+ Row("b", "x", null, "two"),
+ Row("a", "x", "y", "three"),
+ Row("b", "x", "y", "three")))
+ }
+ }
+}
+
+/**
+ * Remove column.
+ * This test suite is identical with AddColumnEvolutionTest,
+ * but this test suite ensures that the schema and result are truncated to
the given schema.
+ */
+trait RemoveColumnEvolutionTest extends SchemaEvolutionTest {
+ import testImplicits._
+
+ test("remove column at the end") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+
+ val df1 = Seq(("1", "a"), ("2", "b")).toDF("col1", "col2")
+ val df2 = df1.withColumn("col3", lit("y"))
+
+ val dir1 = s"$path${File.separator}part=two"
+ val dir2 = s"$path${File.separator}part=three"
+
+
df1.write.mode("overwrite").format(format).options(options).save(dir1)
+
df2.write.mode("overwrite").format(format).options(options).save(dir2)
+
+ val df = spark.read
+ .schema(df1.schema)
+ .format(format)
+ .options(options)
+ .load(path)
+
+ checkAnswer(df, Seq(
+ Row("1", "a", "two"),
+ Row("2", "b", "two"),
+ Row("1", "a", "three"),
+ Row("2", "b", "three")))
+ }
+ }
+}
+
+/**
+ * Change column positions.
+ * This suite assumes that all data set have the same number of columns.
+ */
+trait ChangePositionEvolutionTest extends SchemaEvolutionTest {
+ import testImplicits._
+
+ test("change column position") {
+ withTempDir { dir =>
+ // val path = dir.getCanonicalPath
+ val path = "/tmp/change"
+
+ val df1 = Seq(("1", "a"), ("2", "b"), ("3", "c")).toDF("col1",
"col2")
+ val df2 = Seq(("d", "4"), ("e", "5"), ("f", "6")).toDF("col2",
"col1")
+ val unionDF = df1.unionByName(df2)
+
+ val dir1 = s"$path${File.separator}part=one"
+ val dir2 = s"$path${File.separator}part=two"
+
+
df1.write.mode("overwrite").format(format).options(options).save(dir1)
+
df2.write.mode("overwrite").format(format).options(options).save(dir2)
+
+ val df = spark.read
+ .schema(unionDF.schema)
+ .format(format)
+ .options(options)
+ .load(path)
+ .select("col1", "col2")
+
+ checkAnswer(df, unionDF)
+ }
+ }
+}
+
+trait BooleanTypeEvolutionTest extends SchemaEvolutionTest {
+ import testImplicits._
+
+ test("boolean to byte/short/int/long") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+
+ val values = (1 to 10).map(_ % 2)
+ val booleanDF = (1 to 10).map(_ % 2 == 1).toDF("col1")
+ val byteDF = values.map(_.toByte).toDF("col1")
+ val shortDF = values.map(_.toShort).toDF("col1")
+ val intDF = values.toDF("col1")
+ val longDF = values.map(_.toLong).toDF("col1")
+
+
booleanDF.write.mode("overwrite").format(format).options(options).save(path)
+
+ val df1 = spark.read
+ .schema("col1 byte")
+ .format(format)
+ .options(options)
+ .load(path)
+ checkAnswer(df1, byteDF)
+
+ val df2 = spark.read
+ .schema("col1 short")
+ .format(format)
+ .options(options)
+ .load(path)
+ checkAnswer(df2, shortDF)
+
+ val df3 = spark.read
+ .schema("col1 int")
+ .format(format)
+ .options(options)
+ .load(path)
+ checkAnswer(df3, intDF)
+
+ val df4 = spark.read
+ .schema("col1 long")
+ .format(format)
+ .options(options)
+ .load(path)
+ checkAnswer(df4, longDF)
+ }
+ }
+}
+
+trait IntegralTypeEvolutionTest extends SchemaEvolutionTest {
+
+ import testImplicits._
+
+ test("change column type from `byte` to `short/int/long`") {
--- End diff --
Ur, for this, when we put the variables (byteDF, ...) outside of `test`
functions, it seems to cause SQLContext errors.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]