This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new bd54ce8 [CARBONDATA-3628] Support alter hive table add array and map type column bd54ce8 is described below commit bd54ce83d819d8350b1a594c3007ac747d5485dc Author: IceMimosa <chk19940...@gmail.com> AuthorDate: Tue Dec 24 11:07:30 2019 +0800 [CARBONDATA-3628] Support alter hive table add array and map type column Support adding array and map data type column by ALTER TABLE This closes #3529 --- .../cluster/sdv/generated/AlterTableTestCase.scala | 26 +++++++++++++++++++++- .../cluster/sdv/generated/SDKwriterTestCase.scala | 3 +-- .../apache/spark/sql/common/util/QueryTest.scala | 10 ++++----- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 ++++----- .../spark/sql/execution/strategy/DDLStrategy.scala | 7 +++--- 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala index 297ff04..cc34df5 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala @@ -1022,7 +1022,31 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll { assert(exception.getMessage.contains("Unsupported alter operation on hive table")) } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { sql("alter table alter_hive add columns(add string)") - sql("insert into alter_hive select 'abc','banglore'") + sql("alter table alter_hive add columns (var map<string, string>)") + sql("insert into alter_hive select 'abc','banglore',map('age','10','birth','2020')") + checkAnswer( + sql("select * from alter_hive"), + Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"))) + ) + } + } + + test("Alter table add column for hive partitioned table for spark version above 2.1") { + sql("drop table if exists alter_hive") + sql("create table alter_hive(name string) stored as rcfile partitioned by (dt string)") + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + sql("alter table alter_hive add columns(add string)") + sql("alter table alter_hive add columns (var map<string, string>)") + sql("alter table alter_hive add columns (loves array<string>)") + sql( + s""" + |insert into alter_hive partition(dt='par') + |select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c') + """.stripMargin) + checkAnswer( + sql("select * from alter_hive where dt='par'"), + Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "par")) + ) } } diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala index d6a9413..82541b2 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/SDKwriterTestCase.scala @@ -146,8 +146,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach { } def deleteFile(path: String, extension: String): Unit = { - val file: CarbonFile = FileFactory - .getCarbonFile(path, FileFactory.getFileType(path)) + val file: CarbonFile = FileFactory.getCarbonFile(path) for (eachDir <- file.listFiles) { if (!eachDir.isDirectory) { diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 9d4fe79..eca20ed 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -88,14 +88,13 @@ class QueryTest extends PlanTest with Suite { protected def checkAnswer(carbon: String, hive: String, uniqueIdentifier: String): Unit = { val path = TestQueryExecutor.hiveresultpath + "/" + uniqueIdentifier - if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { - val objinp = new ObjectInputStream(FileFactory - .getDataInputStream(path, FileFactory.getFileType(path))) + if (FileFactory.isFileExist(path)) { + val objinp = new ObjectInputStream(FileFactory.getDataInputStream(path)) val rows = objinp.readObject().asInstanceOf[Array[Row]] objinp.close() QueryTest.checkAnswer(sql(carbon), rows) match { case Some(errorMessage) => { - FileFactory.deleteFile(path, FileFactory.getFileType(path)) + FileFactory.deleteFile(path) writeAndCheckAnswer(carbon, hive, path) } case None => @@ -107,8 +106,7 @@ class QueryTest extends PlanTest with Suite { private def writeAndCheckAnswer(carbon: String, hive: String, path: String): Unit = { val rows = sql(hive).collect() - val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path, FileFactory - .getFileType(path))) + val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path)) obj.writeObject(rows) obj.close() checkAnswer(sql(carbon), rows) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 0f31471..71e91cc 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.rdd import java.io.IOException import java.util -import java.util.{Collections, List, Map} +import java.util.{Collections, List} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable @@ -32,10 +32,11 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} import org.apache.spark.sql.hive.DistributionUtil -import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter} +import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl @@ -64,8 +65,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger._ import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.MergeResult -import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util} +import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} class CarbonMergerRDD[K, V]( @transient private val ss: SparkSession, @@ -680,7 +681,7 @@ class CarbonMergerRDD[K, V]( partitionNames = null, splits = allSplits) val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn) - val sparkDataType = Util.convertCarbonToSparkDataType(dataType) + val sparkDataType = CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dataType) // Change string type to support all types val sampleRdd = scanRdd .map(row => (row.get(0, sparkDataType), null)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 7c8993e..a851bc3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.strategy import org.apache.spark.sql._ +import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -35,11 +36,9 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.StructField import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil} -import org.apache.carbondata.common.exceptions.DeprecatedFeatureException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.util.Util /** * Carbon strategies for ddl commands @@ -176,8 +175,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols) .map { a => - StructField(a.column, - Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get))) + StructField(a.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType( + DataTypeUtil.valueOf(a.dataType.get))) } val identifier = TableIdentifier( alterTableAddColumnsModel.tableName,