Repository: carbondata Updated Branches: refs/heads/branch-1.3 660190fb5 -> 5b44e8105
[CARBONDATA-2219] Added validation for external partition location to use same schema. This closes #2018 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/092b5d58 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/092b5d58 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/092b5d58 Branch: refs/heads/branch-1.3 Commit: 092b5d58a50498a0a66bf6166907965612eb1fc5 Parents: 660190f Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu Mar 1 12:04:53 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Fri Mar 2 21:26:13 2018 +0530 ---------------------------------------------------------------------- .../blockletindex/SegmentIndexFileStore.java | 13 +++-- .../core/metadata/SegmentFileStore.java | 27 +++++++++- .../examples/CarbonPartitionExample.scala | 3 +- .../StandardPartitionTableQueryTestCase.scala | 57 +++++++++++++++----- ...arbonAlterTableAddHivePartitionCommand.scala | 18 ++++++- 5 files changed, 94 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index b88c1f4..4883d94 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -90,23 +90,22 @@ public class SegmentIndexFileStore { /** * Read all index files and keep the cache in it. * - * @param segmentFileStore + * @param segmentFile * @throws IOException */ - public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status, - boolean ignoreStatus) throws IOException { + public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath, + SegmentStatus status, boolean ignoreStatus) throws IOException { List<CarbonFile> carbonIndexFiles = new ArrayList<>(); - if (segmentFileStore.getLocationMap() == null) { + if (segmentFile == null) { return; } - for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore + for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFile .getLocationMap().entrySet()) { String location = locations.getKey(); if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) { if (locations.getValue().isRelative()) { - location = - segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location; + location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location; } for (String indexFile : locations.getValue().getFiles()) { CarbonFile carbonFile = FileFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 1902ab9..f2548b5 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; @@ -314,7 +315,7 @@ public class SegmentFileStore { } SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); indexFilesMap = new HashMap<>(); - indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus); + indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus); Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { @@ -329,6 +330,30 @@ public class SegmentFileStore { } /** + * Reads all index files and get the schema of each index file + * @throws IOException + */ + public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile segmentFile, + String tablePath) throws IOException { + Map<String, List<ColumnSchema>> schemaMap = new HashMap<>(); + if (segmentFile == null) { + return schemaMap; + } + SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); + indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, SegmentStatus.SUCCESS, true); + Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath(); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) { + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue()); + if (indexInfo.size() > 0) { + schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable()); + } + } + return schemaMap; + } + + /** * Gets all index files from this segment * @return */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala index 6837c56..2391dbe 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException object CarbonPartitionExample { @@ -195,7 +196,7 @@ object CarbonPartitionExample { try { spark.sql("""SHOW PARTITIONS t1""").show(100, false) } catch { - case ex: AnalysisException => LOGGER.error(ex.getMessage()) + case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage()) } spark.sql("""SHOW PARTITIONS t0""").show(100, false) spark.sql("""SHOW PARTITIONS t3""").show(100, false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index 918bbff..58eb9f9 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -290,15 +290,17 @@ test("Creation of partition table should fail if the colname in table schema and val location = metastoredb +"/" +"ravi" sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload") + val frame = sql("select count(empno) from staticpartitionlocload") verifyPartitionInfo(frame, Seq("empname=ravi")) - assert(frame.count() == 10) + checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(10))) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(20))) val file = FileFactory.getCarbonFile(location) assert(file.exists()) FileFactory.deleteAllCarbonFilesOfDir(file) } - test("add external partition with static column partition with load command") { + test("add external partition with static column partition with load command with diffrent schema") { sql( """ @@ -324,18 +326,43 @@ test("Creation of partition table should fail if the colname in table schema and | PARTITIONED BY (empname String) | STORED BY 'org.apache.carbondata.format' """.stripMargin) - sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""") - val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload") - verifyPartitionInfo(frame, Seq("empname=ravi")) - assert(frame.count() == 10) - val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra" - sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""") - val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload") - verifyPartitionInfo(frame1, Seq("empname=indra")) - assert(frame1.count() == 20) + intercept[Exception] { + sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""") + } + assert(sql(s"show partitions staticpartitionextlocload").count() == 0) val file = FileFactory.getCarbonFile(location) - assert(file.exists()) - FileFactory.deleteAllCarbonFilesOfDir(file) + if(file.exists()) { + FileFactory.deleteAllCarbonFilesOfDir(file) + } + } + + test("add external partition with static column partition with load command") { + + sql( + """ + | CREATE TABLE staticpartitionlocloadother_new (empno int, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int,projectenddate Date,doj Timestamp) + | PARTITIONED BY (empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val location = metastoredb +"/" +"ravi1" + sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20))) + sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""") + checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10))) + sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""") + checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20))) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30))) + val file = FileFactory.getCarbonFile(location) + if(file.exists()) { + FileFactory.deleteAllCarbonFilesOfDir(file) + } } test("drop partition on preAggregate table should fail"){ @@ -387,6 +414,8 @@ test("Creation of partition table should fail if the colname in table schema and sql("drop table if exists staticpartitionlocload") sql("drop table if exists staticpartitionextlocload") sql("drop table if exists staticpartitionlocloadother") + sql("drop table if exists staticpartitionextlocload_new") + sql("drop table if exists staticpartitionlocloadother_new") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index 2aaecc7..b0e6b94 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand( override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true) + AlterTableDropPartitionCommand( + tableName, + partitionSpecsAndLocs.map(_._1), + ifExists = true, + purge = false, + retainData = true).run(sparkSession) val msg = s"Got exception $exception when processing data of add partition." + "Dropping partitions to the metadata" LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) @@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand( val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath, partitionSpecsAndLocsTobeAdded) if (segmentFile != null) { + val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath) + val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala + var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) => + columnSchemas.asScala.exists { col => + tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId)) + } && columnSchemas.size() == tableColums.length + } + if (!isSameSchema) { + throw new UnsupportedOperationException( + "Schema of index files located in location is not matching with current table schema") + } val loadModel = new CarbonLoadModel loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table)) // Create new entry in tablestatus file