Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 660190fb5 -> 5b44e8105


[CARBONDATA-2219] Added validation for external partition location to use same 
schema.

This closes #2018


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/092b5d58
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/092b5d58
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/092b5d58

Branch: refs/heads/branch-1.3
Commit: 092b5d58a50498a0a66bf6166907965612eb1fc5
Parents: 660190f
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Thu Mar 1 12:04:53 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Fri Mar 2 21:26:13 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    | 13 +++--
 .../core/metadata/SegmentFileStore.java         | 27 +++++++++-
 .../examples/CarbonPartitionExample.scala       |  3 +-
 .../StandardPartitionTableQueryTestCase.scala   | 57 +++++++++++++++-----
 ...arbonAlterTableAddHivePartitionCommand.scala | 18 ++++++-
 5 files changed, 94 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index b88c1f4..4883d94 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -90,23 +90,22 @@ public class SegmentIndexFileStore {
   /**
    * Read all index files and keep the cache in it.
    *
-   * @param segmentFileStore
+   * @param segmentFile
    * @throws IOException
    */
-  public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, 
SegmentStatus status,
-      boolean ignoreStatus) throws IOException {
+  public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, 
String tablePath,
+      SegmentStatus status, boolean ignoreStatus) throws IOException {
     List<CarbonFile> carbonIndexFiles = new ArrayList<>();
-    if (segmentFileStore.getLocationMap() == null) {
+    if (segmentFile == null) {
       return;
     }
-    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : 
segmentFileStore
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : 
segmentFile
         .getLocationMap().entrySet()) {
       String location = locations.getKey();
 
       if (locations.getValue().getStatus().equals(status.getMessage()) || 
ignoreStatus) {
         if (locations.getValue().isRelative()) {
-          location =
-              segmentFileStore.getTablePath() + 
CarbonCommonConstants.FILE_SEPARATOR + location;
+          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + 
location;
         }
         for (String indexFile : locations.getValue().getFiles()) {
           CarbonFile carbonFile = FileFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1902ab9..f2548b5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -314,7 +315,7 @@ public class SegmentFileStore {
     }
     SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
     indexFilesMap = new HashMap<>();
-    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+    indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, 
ignoreStatus);
     Map<String, byte[]> carbonIndexMap = 
indexFileStore.getCarbonIndexMapWithFullPath();
     DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
@@ -329,6 +330,30 @@ public class SegmentFileStore {
   }
 
   /**
+   * Reads all index files and get the schema of each index file
+   * @throws IOException
+   */
+  public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile 
segmentFile,
+      String tablePath) throws IOException {
+    Map<String, List<ColumnSchema>> schemaMap = new HashMap<>();
+    if (segmentFile == null) {
+      return schemaMap;
+    }
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, 
SegmentStatus.SUCCESS, true);
+    Map<String, byte[]> carbonIndexMap = 
indexFileStore.getCarbonIndexMapWithFullPath();
+    DataFileFooterConverter fileFooterConverter = new 
DataFileFooterConverter();
+    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+      List<DataFileFooter> indexInfo =
+          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      if (indexInfo.size() > 0) {
+        schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
+      }
+    }
+    return schemaMap;
+  }
+
+  /**
    * Gets all index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 6837c56..2391dbe 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 object CarbonPartitionExample {
 
@@ -195,7 +196,7 @@ object CarbonPartitionExample {
     try {
       spark.sql("""SHOW PARTITIONS t1""").show(100, false)
     } catch {
-      case ex: AnalysisException => LOGGER.error(ex.getMessage())
+      case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage())
     }
     spark.sql("""SHOW PARTITIONS t0""").show(100, false)
     spark.sql("""SHOW PARTITIONS t3""").show(100, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 918bbff..58eb9f9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -290,15 +290,17 @@ test("Creation of partition table should fail if the 
colname in table schema and
     val location = metastoredb +"/" +"ravi"
     sql(s"""alter table staticpartitionlocload add partition (empname='ravi') 
location '$location'""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
-    val frame = sql("select 
empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from staticpartitionlocload")
+    val frame = sql("select count(empno) from staticpartitionlocload")
     verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), 
Seq(Row(10)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 
'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), 
Seq(Row(20)))
     val file = FileFactory.getCarbonFile(location)
     assert(file.exists())
     FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
-  test("add external partition with static column partition with load 
command") {
+  test("add external partition with static column partition with load command 
with diffrent schema") {
 
     sql(
       """
@@ -324,18 +326,43 @@ test("Creation of partition table should fail if the 
colname in table schema and
         | PARTITIONED BY (empname String)
         | STORED BY 'org.apache.carbondata.format'
       """.stripMargin)
-    sql(s"""alter table staticpartitionextlocload add partition 
(empname='ravi') location '$location'""")
-    val frame = sql("select 
empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from staticpartitionextlocload")
-    verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
-    val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
-    sql(s"""alter table staticpartitionextlocload add partition 
(empname='indra') location '$location2'""")
-    val frame1 = sql("select 
empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj
 from staticpartitionextlocload")
-    verifyPartitionInfo(frame1, Seq("empname=indra"))
-    assert(frame1.count() == 20)
+    intercept[Exception] {
+      sql(s"""alter table staticpartitionextlocload add partition 
(empname='ravi') location '$location'""")
+    }
+    assert(sql(s"show partitions staticpartitionextlocload").count() == 0)
     val file = FileFactory.getCarbonFile(location)
-    assert(file.exists())
-    FileFactory.deleteAllCarbonFilesOfDir(file)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
+  }
+
+  test("add external partition with static column partition with load 
command") {
+
+    sql(
+      """
+        | CREATE TABLE staticpartitionlocloadother_new (empno int, designation 
String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi1"
+    sql(s"""alter table staticpartitionlocloadother_new add partition 
(empname='ravi') location '$location'""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP 
PARTITION(empname='ravi')""")
+    checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(10)))
+    sql(s"""alter table staticpartitionlocloadother_new add partition 
(empname='ravi') location '$location'""")
+    checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= 
',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from 
staticpartitionlocloadother_new"), Seq(Row(30)))
+    val file = FileFactory.getCarbonFile(location)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
   }
 
   test("drop partition on preAggregate table should fail"){
@@ -387,6 +414,8 @@ test("Creation of partition table should fail if the 
colname in table schema and
     sql("drop table if exists staticpartitionlocload")
     sql("drop table if exists staticpartitionextlocload")
     sql("drop table if exists staticpartitionlocloadother")
+    sql("drop table if exists staticpartitionextlocload_new")
+    sql("drop table if exists staticpartitionlocloadother_new")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 2aaecc7..b0e6b94 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand(
 
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
-    AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), 
true, false, true)
+    AlterTableDropPartitionCommand(
+      tableName,
+      partitionSpecsAndLocs.map(_._1),
+      ifExists = true,
+      purge = false,
+      retainData = true).run(sparkSession)
     val msg = s"Got exception $exception when processing data of add 
partition." +
               "Dropping partitions to the metadata"
     LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
@@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand(
       val segmentFile = 
SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
         partitionSpecsAndLocsTobeAdded)
       if (segmentFile != null) {
+        val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, 
table.getTablePath)
+        val tableColums = 
table.getTableInfo.getFactTable.getListOfColumns.asScala
+        var isSameSchema = indexToSchemas.asScala.exists{ case(key, 
columnSchemas) =>
+          columnSchemas.asScala.exists { col =>
+            tableColums.exists(p => 
p.getColumnUniqueId.equals(col.getColumnUniqueId))
+          } && columnSchemas.size() == tableColums.length
+        }
+        if (!isSameSchema) {
+          throw new UnsupportedOperationException(
+            "Schema of index files located in location is not matching with 
current table schema")
+        }
         val loadModel = new CarbonLoadModel
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file

Reply via email to