This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new f2c0b2c [CARBONDATA-3657]Support alter hive table add columns with complex types f2c0b2c is described below commit f2c0b2cb465cd1b3db349a111597eb5c26c72eeb Author: IceMimosa <chk19940...@gmail.com> AuthorDate: Thu Jan 9 01:12:10 2020 +0800 [CARBONDATA-3657]Support alter hive table add columns with complex types Why is this PR needed? Alter hive add columns has some problems in carbon What changes were proposed in this PR? This PR will support add column for: Map type Array type Struct type Decimal type with precision and scale Add columns with comments Does this PR introduce any user interface change? No Is any new test case added? Yes This closes #3569 --- .../apache/carbondata/core/util/DataTypeUtil.java | 41 +-------------- .../cluster/sdv/generated/AlterTableTestCase.scala | 44 ++++++++++++++-- .../carbondata/cluster/sdv/suite/SDVSuites.scala | 3 +- .../spark/util/DataTypeConverterUtil.scala | 58 +++++++++++++++++++++- .../spark/sql/execution/strategy/DDLStrategy.scala | 18 +++++-- 5 files changed, 114 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 0f8f5ab..58137a0 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -1044,47 +1044,10 @@ public final class DataTypeUtil { * @return returns the datatype based on the input string from json to deserialize the tableInfo */ public static DataType valueOf(DataType dataType, int precision, int scale) { - if (DataTypes.STRING.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.STRING; - } else if (DataTypes.DATE.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.DATE; - } else if (DataTypes.TIMESTAMP.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.TIMESTAMP; - } else if (DataTypes.BOOLEAN.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.BOOLEAN; - } else if (DataTypes.BYTE.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.BYTE; - } else if (DataTypes.SHORT.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.SHORT; - } else if (DataTypes.SHORT_INT.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.SHORT_INT; - } else if (DataTypes.INT.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.INT; - } else if (DataTypes.LONG.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.LONG; - } else if (DataTypes.FLOAT.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.FLOAT; - } else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.DOUBLE; - } else if (DataTypes.VARCHAR.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.VARCHAR; - } else if (DataTypes.NULL.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.NULL; - } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.BYTE_ARRAY; - } else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType.getName())) { - return DataTypes.BINARY; - } else if (dataType.getName().equalsIgnoreCase("decimal")) { + if (DataTypes.isDecimal(dataType)) { return DataTypes.createDecimalType(precision, scale); - } else if (dataType.getName().equalsIgnoreCase("array")) { - return DataTypes.createDefaultArrayType(); - } else if (dataType.getName().equalsIgnoreCase("struct")) { - return DataTypes.createDefaultStructType(); - } else if (dataType.getName().equalsIgnoreCase("map")) { - return DataTypes.createDefaultMapType(); } else { - throw new RuntimeException( - "create DataType with invalid dataType.getName(): " + dataType.getName()); + return valueOf(dataType.getName()); } } diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala index 4b7bde6..b0885d1 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala @@ -1038,14 +1038,52 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll { sql("alter table alter_hive add columns(add string)") sql("alter table alter_hive add columns (var map<string, string>)") sql("alter table alter_hive add columns (loves array<string>)") + sql("alter table alter_hive add columns (props struct<name:string, age:int>)") sql( s""" |insert into alter_hive partition(dt='par') - |select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c') + |select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c'), named_struct('name', 'abc', 'age', 10) """.stripMargin) checkAnswer( - sql("select * from alter_hive where dt='par'"), - Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "par")) + sql("select name,add,var,loves,props.name,props.age,dt from alter_hive where dt='par'"), + Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "abc", 10, "par")) + ) + } + } + + test("Alter table add complex column for hive table for spark version above 2.1") { + sql("drop table if exists alter_hive") + sql("create table alter_hive(name string) stored as rcfile") + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + sql("alter table alter_hive add columns (add1 string comment 'comment1')") + sql("alter table alter_hive add columns (add2 decimal)") + sql("alter table alter_hive add columns (add3 decimal(20,2))") + sql("alter table alter_hive add columns (arr1 array<string>)") + sql("alter table alter_hive add columns (arr2 array<array<string>>)") + sql("alter table alter_hive add columns (map1 map<string, string>)") + sql("alter table alter_hive add columns (map2 map<string, bigint>)") + sql("alter table alter_hive add columns (map3 map<string, map<string, int>>)") + sql("alter table alter_hive add columns (map4 map<string, array<string>>)") + sql("alter table alter_hive add columns (struct1 struct<name:string, age:int>)") + sql("alter table alter_hive add columns (struct2 struct<name:array<string>, age:int, props: map<string, string>>)") + sql("alter table alter_hive add columns (struct3 struct<s:struct<a:string, b:bigint>>)") + checkAnswer( + sql("desc alter_hive"), + Seq( + Row("name", "string", null), + Row("add1", "string", "comment1"), + Row("add2", "decimal(10,2)", null), + Row("add3", "decimal(20,2)", null), + Row("arr1", "array<string>", null), + Row("arr2", "array<array<string>>", null), + Row("map1", "map<string,string>", null), + Row("map2", "map<string,bigint>", null), + Row("map3", "map<string,map<string,int>>", null), + Row("map4", "map<string,array<string>>", null), + Row("struct1", "struct<name:string,age:int>", null), + Row("struct2", "struct<name:array<string>,age:int,props:map<string,string>>", null), + Row("struct3", "struct<s:struct<a:string,b:bigint>>", null) + ) ) } } diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala index 09fcc1d..a29b58b 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala @@ -168,8 +168,7 @@ class SDVSuites4 extends Suites with BeforeAndAfterAll { class SDVSuites5 extends Suites with BeforeAndAfterAll { val suites = new CreateTableUsingSparkCarbonFileFormatTestCase :: - new SparkCarbonDataSourceTestCase :: - new CarbonV1toV3CompatabilityTestCase :: Nil + new SparkCarbonDataSourceTestCase :: Nil override val nestedSuites = suites.toIndexedSeq diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala index 8050e5f..001c336 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala @@ -17,9 +17,12 @@ package org.apache.carbondata.spark.util +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.execution.command.Field import org.apache.spark.sql.util.CarbonException -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} +import org.apache.carbondata.core.metadata.datatype.{ArrayType, DataType, DataTypes, DecimalType, MapType, StructField, StructType} import org.apache.carbondata.format.{DataType => ThriftDataType} object DataTypeConverterUtil { @@ -87,6 +90,59 @@ object DataTypeConverterUtil { } } + def convertToCarbonType(field: Field): DataType = { + this.convertToCarbonType(field.dataType.get) match { + case _: DecimalType => + if (field.scale == 0) { + DataTypes.createDefaultDecimalType() + } else { + DataTypes.createDecimalType(field.precision, field.scale) + } + case _: MapType => + field.children match { + case Some(List(kv)) => + kv.children match { + case Some(List(k, v)) => + DataTypes.createMapType(this.convertToCarbonType(k), this.convertToCarbonType(v)) + } + case _ => + CarbonException.analysisException(s"Unsupported map data type: ${field.column}") + } + case _: ArrayType => + field.children match { + case Some(List(v)) => + DataTypes.createArrayType(this.convertToCarbonType(v)) + case None => + CarbonException.analysisException(s"Unsupported array data type: ${field.column}") + } + case _: StructType => + field.children match { + case Some(fs) => + val subFields = fs.map(f => + this.convertSubFields(f.column, this.convertToCarbonType(f), f.children.orNull) + ) + DataTypes.createStructType(subFields.asJava) + case None => + CarbonException.analysisException(s"Unsupported struct data type: ${field.column}") + } + case other: DataType => other + } + } + + private def convertSubFields(name: String, dataType: DataType, + children: List[Field]): StructField = { + val actualName = name.split("\\.").last + children match { + case null | Nil => + new StructField(actualName, dataType) + case other => + val subFields = other.map(f => + this.convertSubFields(f.column, this.convertToCarbonType(f), f.children.orNull) + ) + new StructField(actualName, dataType, subFields.asJava) + } + } + /** * convert from wrapper to external data type * diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 8f03fe1..ba194dd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.strategy +import org.apache.commons.lang3.StringUtils import org.apache.spark.sql._ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.catalyst.TableIdentifier @@ -38,7 +39,8 @@ import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, Spa import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.spark.util.DataTypeConverterUtil /** * Carbon strategies for ddl commands @@ -173,10 +175,16 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { // TODO: remove this else if check once the 2.1 version is unsupported by carbon } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols) - .map { - a => - StructField(a.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType( - DataTypeUtil.valueOf(a.dataType.get))) + .map { f => + val structField = + StructField(f.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType( + DataTypeConverterUtil.convertToCarbonType(f)) + ) + if (StringUtils.isNotEmpty(f.columnComment)) { + structField.withComment(f.columnComment) + } else { + structField + } } val identifier = TableIdentifier( alterTableAddColumnsModel.tableName,