[CARBONDATA-940] alter table add/split partition for spark 2.1

add/split partition function

This closes #1192


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/874764f9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/874764f9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/874764f9

Branch: refs/heads/master
Commit: 874764f95629c98d8f98a424ca742614e38f704f
Parents: 3efd330
Author: lionelcao <whuca...@gmail.com>
Authored: Wed Jul 19 14:36:18 2017 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Thu Aug 17 09:28:54 2017 +0800

----------------------------------------------------------------------
 conf/carbon.properties.template                 |   3 +
 .../core/constants/CarbonCommonConstants.java   |   9 +
 .../apache/carbondata/core/locks/LockUsage.java |   1 +
 .../core/metadata/schema/PartitionInfo.java     |  25 +
 .../filter/partition/PartitionFilterUtil.java   |   2 +-
 .../PartitionSpliterRawResultIterator.java      |  72 +++
 .../core/scan/wrappers/ByteArrayWrapper.java    |   4 +-
 .../core/util/path/CarbonTablePath.java         |  18 +
 .../src/main/resources/partition_data.csv       |  25 -
 .../examples/CarbonPartitionExample.scala       |  75 ++-
 .../hadoop/api/CarbonTableInputFormat.java      | 150 +++++-
 .../hadoop/util/CarbonInputFormatUtil.java      |   8 +
 .../src/test/resources/partition_data.csv       |  28 +
 .../TestDataLoadingForPartitionTable.scala      |   4 +-
 .../carbondata/spark/load/CarbonLoaderUtil.java |   4 +-
 .../spark/partition/SplitPartitionCallable.java |  41 ++
 .../org/apache/carbondata/spark/KeyVal.scala    |   8 +
 .../load/DataLoadProcessorStepOnSpark.scala     |   2 +-
 .../spark/rdd/AlterTableSplitPartitionRDD.scala | 146 ++++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   8 +-
 .../spark/rdd/CarbonScanPartitionRDD.scala      | 269 ++++++++++
 .../spark/rdd/DataManagementFunc.scala          |  47 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   6 +-
 .../spark/rdd/PartitionSplitter.scala           | 104 ++++
 .../carbondata/spark/util/CommonUtil.scala      |  40 +-
 .../execution/command/carbonTableSchema.scala   |  16 +
 .../org/apache/spark/util/PartitionUtils.scala  | 135 +++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  99 +++-
 .../execution/command/carbonTableSchema.scala   | 147 +++++-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  26 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   2 +-
 .../partition/TestAlterPartitionTable.scala     | 523 +++++++++++++++++++
 .../merger/CompactionResultSortProcessor.java   |   3 +-
 .../newflow/DataLoadProcessBuilder.java         |   2 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   2 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   4 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   2 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   5 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |   2 +-
 .../steps/DataWriterBatchProcessorStepImpl.java |   2 +-
 .../steps/DataWriterProcessorStepImpl.java      |   2 +-
 .../sortandgroupby/sortdata/SortParameters.java |   4 +-
 .../spliter/AbstractCarbonQueryExecutor.java    | 133 +++++
 .../processing/spliter/CarbonSplitExecutor.java |  64 +++
 .../spliter/RowResultSpliterProcessor.java      | 105 ++++
 .../exception/SliceSpliterException.java        |  78 +++
 .../store/CarbonFactDataHandlerModel.java       |   2 +
 .../util/CarbonDataProcessorUtil.java           |  12 +-
 48 files changed, 2360 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/conf/carbon.properties.template
----------------------------------------------------------------------
diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template
index b5f5101..de8a4ee 100644
--- a/conf/carbon.properties.template
+++ b/conf/carbon.properties.template
@@ -42,6 +42,9 @@ carbon.enableXXHash=true
 #carbon.max.level.cache.size=-1
 #enable prefetch of data during merge sort while reading data from sort temp 
files in data loading
 #carbon.merge.sort.prefetch=true
+######## Alter Partition Configuration ########
+#Number of cores to be used while alter partition
+carbon.number.of.cores.while.alterPartition=2
 ######## Compaction Configuration ########
 #Number of cores to be used while compacting
 carbon.number.of.cores.while.compacting=2

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 14258f8..74a08ec 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -306,6 +306,10 @@ public final class CarbonCommonConstants {
   @CarbonProperty
   public static final String NUM_CORES_COMPACTING = 
"carbon.number.of.cores.while.compacting";
   /**
+   * Number of cores to be used while alter partition
+   */
+  public static final String NUM_CORES_ALT_PARTITION = 
"carbon.number.of.cores.while.altPartition";
+  /**
    * Number of cores to be used for block sort
    */
   @CarbonProperty
@@ -967,6 +971,11 @@ public final class CarbonCommonConstants {
   public static final String COMPACTION_KEY_WORD = "COMPACTION";
 
   /**
+   * Indicates alter partition
+   */
+  public static String ALTER_PARTITION_KEY_WORD = "ALTER_PARTITION";
+
+  /**
    * hdfs temporary directory key
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java 
b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
index 046fff5..1738c64 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java
@@ -26,6 +26,7 @@ public class LockUsage {
   public static final String METADATA_LOCK = "meta.lock";
   public static final String COMPACTION_LOCK = "compaction.lock";
   public static final String SYSTEMLEVEL_COMPACTION_LOCK = 
"system_level_compaction.lock";
+  public static final String ALTER_PARTITION_LOCK = "alter_partition.lock";
   public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
   public static final String TABLE_UPDATE_STATUS_LOCK = 
"tableupdatestatus.lock";
   public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 39d7f3f..4b0bc3e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -67,6 +67,31 @@ public class PartitionInfo implements Serializable {
     this.partitionIds = new ArrayList<>();
   }
 
+  /**
+   * add partition means split default partition, add in last directly
+   */
+  public void  addPartition(int addPartitionCount) {
+    for (int i = 0; i < addPartitionCount; i++) {
+      partitionIds.add(++MAX_PARTITION);
+      numPartitions++;
+    }
+  }
+
+  /**
+   * e.g. original partition[0,1,2,3,4,5]
+   * split partition 2 to partition 6,7,8 (will not reuse 2)
+   * then sourcePartitionId is 2, newPartitionNumbers is 3
+   * @param sourcePartitionIndex
+   * @param newPartitionNumbers
+   */
+  public void splitPartition(int sourcePartitionIndex, int 
newPartitionNumbers) {
+    partitionIds.remove(sourcePartitionIndex);
+    for (int i = 0; i < newPartitionNumbers; i++) {
+      partitionIds.add(sourcePartitionIndex + i, ++MAX_PARTITION);
+    }
+    numPartitions = numPartitions - 1 + newPartitionNumbers;
+  }
+
   public List<ColumnSchema> getColumnSchemaList() {
     return columnSchemaList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
index d040c1b..9671199 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
@@ -107,7 +107,7 @@ public class PartitionFilterUtil {
           }
         }
       } else {
-        // LessThanEqualTo(<)
+        // LessThan(<)
         outer4:
         for (int i = 0; i < partitions; i++) {
           for (String value : listInfo.get(i)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
new file mode 100644
index 0000000..553f85e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.result.iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.scan.result.BatchResult;
+
+public class PartitionSpliterRawResultIterator extends 
CarbonIterator<Object[]> {
+
+  private CarbonIterator<BatchResult> iterator;
+  private BatchResult batch;
+  private int counter;
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName());
+
+  public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> 
iterator) {
+    this.iterator = iterator;
+  }
+
+  @Override public boolean hasNext() {
+    if (null == batch || checkBatchEnd(batch)) {
+      if (iterator.hasNext()) {
+        batch = iterator.next();
+        counter = 0;
+      } else {
+        return false;
+      }
+    }
+
+    return !checkBatchEnd(batch);
+  }
+
+  @Override public Object[] next() {
+    if (batch == null) {
+      batch = iterator.next();
+    }
+    if (!checkBatchEnd(batch)) {
+      return batch.getRawRow(counter++);
+    } else {
+      batch = iterator.next();
+      counter = 0;
+    }
+    return batch.getRawRow(counter++);
+  }
+
+  /**
+   * To check if the batch is processed completely
+   * @param batch
+   * @return
+   */
+  private boolean checkBatchEnd(BatchResult batch) {
+    return !(counter < batch.getSize());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 93bf8eb..2f981b5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -16,13 +16,15 @@
  */
 package org.apache.carbondata.core.scan.wrappers;
 
+import java.io.Serializable;
+
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 
 /**
  * This class will store the dimension column data when query is executed
  * This can be used as a key for aggregation
  */
-public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> {
+public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, 
Serializable {
 
   /**
    * to store key which is generated using

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 7b8904c..fb9b783 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -303,6 +303,20 @@ public class CarbonTablePath extends Path {
     }
   }
 
+  public String getCarbonIndexFilePath(String taskId, String partitionId, 
String segmentId,
+      int batchNo, String bucketNumber, String timeStamp,
+      ColumnarFormatVersion columnarFormatVersion) {
+    switch (columnarFormatVersion) {
+      case V1:
+      case V2:
+        return getCarbonIndexFilePath(taskId, partitionId, segmentId, 
bucketNumber);
+      default:
+        String segmentDir = getSegmentDir(partitionId, segmentId);
+        return segmentDir + File.separator + 
getCarbonIndexFileName(Integer.parseInt(taskId),
+            Integer.parseInt(bucketNumber), batchNo, timeStamp);
+    }
+  }
+
   private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
       String factUpdatedtimeStamp) {
     return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + 
INDEX_FILE_EXT;
@@ -537,6 +551,10 @@ public class CarbonTablePath extends Path {
       return Integer.parseInt(taskNo.split(BATCH_PREFIX)[0]);
     }
 
+    public static int getBatchNoFromTaskNo(String taskNo) {
+      return Integer.parseInt(taskNo.split(BATCH_PREFIX)[1]);
+    }
+
     /**
      * Gets the file name from file path
      */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/examples/spark2/src/main/resources/partition_data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/partition_data.csv 
b/examples/spark2/src/main/resources/partition_data.csv
deleted file mode 100644
index 2d521b7..0000000
--- a/examples/spark2/src/main/resources/partition_data.csv
+++ /dev/null
@@ -1,25 +0,0 @@
-vin,logdate,phonenumber,country,area
-A42158424831,2016/02/12,125371341,China,Asia
-A42158473831,2016/01/12,125371342,China,Asia
-A42152474832,2016/02/12,125371343,US,America
-A42151477823,2016/12/12,125371344,China,Asia
-A42158474135,2016/02/15,125371345,Japan,Asia
-A42258434831,2016/12/12,125371346,China,Asia
-A42158475831,2016/05/12,125371347,UK,Europe
-A41158494830,2015/07/12,225371348,China,Asia
-A42158424831,2015/02/12,225371349,China,Asia
-A42158473830,2014/01/12,225371310,China,Asia
-A42152474830,2013/02/12,325371311,US,America
-A42151477823,2012/12/12,425371312,China,Asia
-A42158474133,2012/02/15,325371313,Japan,Asia
-A42258434835,2013/12/12,525371314,China,Asia
-A42158475836,2014/05/12,625371315,UK,Europe
-A41158494838,2015/07/12,525371316,China,Asia
-A42158424833,2016/02/12,425371317,China,Asia
-A42158473832,2017/01/12,325371318,China,Asia
-A42152474834,2011/02/12,225371319,US,America
-A42151477824,2012/12/12,225371320,China,Asia
-A42158474137,2013/02/15,325371321,Japan,Asia
-A42258434837,2014/12/12,25371322,China,Asia
-A42158475838,2014/05/12,425371323,UK,Europe
-A41158494839,2016/07/12,625371324,China,Asia

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 8a01ba1..ca0501c 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -35,7 +35,7 @@ object CarbonPartitionExample {
     val storeLocation = s"$rootPath/examples/spark2/target/store"
     val warehouse = s"$rootPath/examples/spark2/target/warehouse"
     val metastoredb = s"$rootPath/examples/spark2/target"
-    val testData = 
s"$rootPath/examples/spark2/src/main/resources/partition_data.csv"
+    val testData = 
s"$rootPath/integration/spark-common-test/src/test/resources/partition_data.csv"
 
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
@@ -51,34 +51,57 @@ object CarbonPartitionExample {
 
     spark.sparkContext.setLogLevel("WARN")
 
-    // none partition table
+    // range partition with bucket defined
     spark.sql("DROP TABLE IF EXISTS t0")
     spark.sql("""
                 | CREATE TABLE IF NOT EXISTS t0
                 | (
+                | id Int,
                 | vin String,
-                | logdate Timestamp,
                 | phonenumber Long,
                 | country String,
-                | area String
+                | area String,
+                | salary Int
                 | )
+                | PARTITIONED BY (logdate Timestamp)
                 | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+                | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01',
+                | 'BUCKETNUMBER'='3',
+                | 'BUCKETCOLUMNS'='vin')
               """.stripMargin)
 
-    // range partition
+    // none partition table
     spark.sql("DROP TABLE IF EXISTS t1")
     spark.sql("""
                 | CREATE TABLE IF NOT EXISTS t1
                 | (
+                | id Int,
                 | vin String,
+                | logdate Timestamp,
                 | phonenumber Long,
                 | country String,
                 | area String
                 | )
-                | PARTITIONED BY (logdate Timestamp)
                 | STORED BY 'carbondata'
-                | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
-                | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01')
+              """.stripMargin)
+
+    // list partition
+    spark.sql("DROP TABLE IF EXISTS t2")
+    spark.sql("""
+                | CREATE TABLE IF NOT EXISTS t2
+                | (
+                | id Int,
+                | vin String,
+                | logdate Timestamp,
+                | phonenumber Long,
+                | country String,
+                | salary Int
+                | )
+                | PARTITIONED BY (area String)
+                | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+                | 'LIST_INFO'='Asia, America, Europe', 'DICTIONARY_EXCLUDE' 
='area')
               """.stripMargin)
 
     // hash partition
@@ -86,10 +109,12 @@ object CarbonPartitionExample {
     spark.sql("""
                 | CREATE TABLE IF NOT EXISTS t3
                 | (
+                | id Int,
                 | logdate Timestamp,
                 | phonenumber Long,
                 | country String,
-                | area String
+                | area String,
+                | salary Int
                 | )
                 | PARTITIONED BY (vin String)
                 | STORED BY 'carbondata'
@@ -101,17 +126,40 @@ object CarbonPartitionExample {
     spark.sql("""
        | CREATE TABLE IF NOT EXISTS t5
        | (
+       | id Int,
        | vin String,
        | logdate Timestamp,
        | phonenumber Long,
-       | area String
+       | area String,
+       | salary Int
        |)
        | PARTITIONED BY (country String)
        | STORED BY 'carbondata'
        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
-       | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South 
Korea ')
+       | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), 
Korea ')
        """.stripMargin)
 
+    // load data into partition table
+    spark.sql(s"""
+       LOAD DATA LOCAL INPATH '$testData' into table t0 
options('BAD_RECORDS_ACTION'='FORCE')
+       """)
+    spark.sql(s"""
+       LOAD DATA LOCAL INPATH '$testData' into table t5 
options('BAD_RECORDS_ACTION'='FORCE')
+       """)
+
+    // alter list partition table t5 to add a partition
+    spark.sql(s"""Alter table t5 add partition ('OutSpace')""".stripMargin)
+    // alter list partition table t5 to split partition 4 into 3 independent 
partition
+    spark.sql(
+      s"""
+         Alter table t5 split partition(4) into ('Canada', 'Russia', '(Good, 
NotGood)')
+       """.stripMargin)
+
+    spark.sql("""select * from t5 where country = 'Good' """).show(100, false)
+
+    spark.sql("select * from t0 order by salary ").show(100, false)
+    spark.sql("select * from t5 order by salary ").show(100, false)
+
     // hive partition table
     spark.sql("DROP TABLE IF EXISTS t7")
     spark.sql("""
@@ -130,6 +178,7 @@ object CarbonPartitionExample {
     spark.sql(s"CREATE DATABASE partitionDB")
     spark.sql(s"""
                 | CREATE TABLE IF NOT EXISTS partitionDB.t9(
+                | id Int,
                 | logdate Timestamp,
                 | phonenumber Int,
                 | country String,
@@ -145,11 +194,11 @@ object CarbonPartitionExample {
 
     // show partitions
     try {
-      spark.sql("""SHOW PARTITIONS t0""").show(100, false)
+      spark.sql("""SHOW PARTITIONS t1""").show(100, false)
     } catch {
       case ex: AnalysisException => LOGGER.error(ex.getMessage())
     }
-    spark.sql("""SHOW PARTITIONS t1""").show(100, false)
+    spark.sql("""SHOW PARTITIONS t0""").show(100, false)
     spark.sql("""SHOW PARTITIONS t3""").show(100, false)
     spark.sql("""SHOW PARTITIONS t5""").show(100, false)
     spark.sql("""SHOW PARTITIONS t7""").show(100, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 19e264b..dc4e76a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -51,8 +51,6 @@ import 
org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.partition.PartitionUtil;
-import org.apache.carbondata.core.scan.partition.Partitioner;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -64,6 +62,7 @@ import org.apache.carbondata.core.util.DataTypeConverter;
 import org.apache.carbondata.core.util.DataTypeConverterImpl;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
@@ -101,6 +100,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
       "mapreduce.input.carboninputformat.segmentnumbers";
   // comma separated list of input files
   public static final String INPUT_FILES = 
"mapreduce.input.carboninputformat.files";
+  public static final String ALTER_PARTITION_ID = 
"mapreduce.input.carboninputformat.partitionid";
   private static final Log LOG = 
LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
@@ -164,6 +164,11 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
+
+  public static void setPartitionIdList(Configuration configuration, 
List<String> partitionIds) {
+    configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+  }
+
   /**
    * It sets unresolved filter expression.
    *
@@ -285,23 +290,17 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
-
+    PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
 
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
-    if (null != filter) {
-      PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
-      if (null != partitionInfo) {
-        Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
-        matchedPartitions = new FilterExpressionProcessor()
-            .getFilteredPartitions(filter, partitionInfo);
+    if (partitionInfo != null) {
+      matchedPartitions = setMatchedPartitions(null, filter, partitionInfo);
+      if (matchedPartitions != null) {
         if (matchedPartitions.cardinality() == 0) {
-          // no partition is required
           return new ArrayList<InputSplit>();
-        }
-        if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
-          // all partitions are required, no need to prune partitions
+        } else if (matchedPartitions.cardinality() == 
partitionInfo.getNumPartitions()) {
           matchedPartitions = null;
         }
       }
@@ -310,7 +309,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     FilterResolverIntf filterInterface = 
CarbonInputFormatUtil.resolveFilter(filter, identifier);
 
     // do block filtering and get split
-    List<InputSplit> splits = getSplits(job, filterInterface, validSegments, 
matchedPartitions);
+    List<InputSplit> splits =
+        getSplits(job, filterInterface, validSegments, matchedPartitions, 
partitionInfo, null);
     // pass the invalid segment to task side in order to remove index entry in 
task side
     if (invalidSegments.size() > 0) {
       for (InputSplit split : splits) {
@@ -323,6 +323,85 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   }
 
   /**
+   * Read data in one segment. For alter table partition statement
+   * @param job
+   * @param targetSegment
+   * @param oldPartitionIdList  get old partitionId before partitionInfo was 
changed
+   * @return
+   * @throws IOException
+   */
+  public List<InputSplit> getSplitsOfOneSegment(JobContext job, String 
targetSegment,
+      List<Integer> oldPartitionIdList, PartitionInfo partitionInfo)
+      throws IOException {
+    AbsoluteTableIdentifier identifier = 
getAbsoluteTableIdentifier(job.getConfiguration());
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+    List<String> segmentList = new ArrayList<>();
+    segmentList.add(targetSegment);
+    setSegmentsToAccess(job.getConfiguration(), segmentList);
+    try {
+
+      // process and resolve the expression
+      Expression filter = getFilterPredicates(job.getConfiguration());
+      CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+      // this will be null in case of corrupt schema file.
+      if (null == carbonTable) {
+        throw new IOException("Missing/Corrupt schema file for table.");
+      }
+
+      CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+
+      // prune partitions for filter query on partition table
+      String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
+      BitSet matchedPartitions = null;
+      if (partitionInfo != null) {
+        matchedPartitions = setMatchedPartitions(partitionIds, filter, 
partitionInfo);
+        if (matchedPartitions != null) {
+          if (matchedPartitions.cardinality() == 0) {
+            return new ArrayList<InputSplit>();
+          } else if (matchedPartitions.cardinality() == 
partitionInfo.getNumPartitions()) {
+            matchedPartitions = null;
+          }
+        }
+      }
+
+      FilterResolverIntf filterInterface = 
CarbonInputFormatUtil.resolveFilter(filter, identifier);
+      // do block filtering and get split
+      List<InputSplit> splits = getSplits(job, filterInterface, segmentList, 
matchedPartitions,
+          partitionInfo, oldPartitionIdList);
+      // pass the invalid segment to task side in order to remove index entry 
in task side
+      if (invalidSegments.size() > 0) {
+        for (InputSplit split : splits) {
+          ((CarbonInputSplit) split).setInvalidSegments(invalidSegments);
+          ((CarbonInputSplit) 
split).setInvalidTimestampRange(invalidTimestampsList);
+        }
+      }
+      return splits;
+    } catch (IOException e) {
+      throw new RuntimeException("Can't get splits of the target segment ", e);
+    }
+  }
+
+  private BitSet setMatchedPartitions(String partitionIds, Expression filter,
+      PartitionInfo partitionInfo) {
+    BitSet matchedPartitions = null;
+    if (null != partitionIds) {
+      String[] partList = partitionIds.replace("[", "").replace("]", 
"").split(",");
+      // only one partitionId in current alter table statement
+      matchedPartitions = new BitSet(Integer.parseInt(partList[0]));
+      for (String partitionId : partList) {
+        matchedPartitions.set(Integer.parseInt(partitionId));
+      }
+    } else {
+      if (null != filter) {
+        matchedPartitions =
+            new FilterExpressionProcessor().getFilteredPartitions(filter, 
partitionInfo);
+      }
+    }
+    return matchedPartitions;
+  }
+  /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, 
CarbonInputFormat.INPUT_SEGMENT_NUMBERS
    * are used to get table path to read.
@@ -331,7 +410,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @throws IOException
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf 
filterResolver,
-      List<String> validSegments, BitSet matchedPartitions) throws IOException 
{
+      List<String> validSegments, BitSet matchedPartitions, PartitionInfo 
partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
 
     List<InputSplit> result = new LinkedList<InputSplit>();
     UpdateVO invalidBlockVOForSegmentId = null;
@@ -344,10 +424,10 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
 
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
-    //for each segment fetch blocks matching filter in Driver BTree
+    // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, 
matchedPartitions,
-            validSegments);
+            validSegments, partitionInfo, oldPartitionIdList);
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : 
dataBlocksOfSegment) {
 
       // Get the UpdateVO for those tables on which IUD operations being 
performed.
@@ -395,7 +475,8 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
    */
   private List<org.apache.carbondata.hadoop.CarbonInputSplit> 
getDataBlocksOfSegment(JobContext job,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf 
resolver,
-      BitSet matchedPartitions, List<String> segmentIds) throws IOException {
+      BitSet matchedPartitions, List<String> segmentIds, PartitionInfo 
partitionInfo,
+      List<Integer> oldPartitionIdList) throws IOException {
 
     QueryStatisticsRecorder recorder = 
CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
@@ -409,17 +490,34 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver);
 
     List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks 
= new ArrayList<>();
+    int partitionIndex = 0;
+    List<Integer> partitionIdList = new ArrayList<>();
+    if (partitionInfo != null) {
+      partitionIdList = partitionInfo.getPartitionIds();
+    }
     for (Blocklet blocklet : prunedBlocklets) {
-      int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
+      int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
           
CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString()));
 
-      // matchedPartitions variable will be null in two cases as follows
-      // 1. the table is not a partition table
-      // 2. the table is a partition table, and all partitions are matched by 
query
-      // for partition table, the task id of carbaondata file name is the 
partition id.
-      // if this partition is not required, here will skip it.
-      if (matchedPartitions == null || matchedPartitions.get(taskId)) {
-        resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+      // OldPartitionIdList is only used in alter table partition command 
because it change
+      // partition info first and then read data.
+      // For other normal query should use newest partitionIdList
+      if (partitionInfo != null) {
+        if (oldPartitionIdList != null) {
+          partitionIndex = oldPartitionIdList.indexOf(partitionId);
+        } else {
+          partitionIndex = partitionIdList.indexOf(partitionId);
+        }
+      }
+      if (partitionIndex != -1) {
+        // matchedPartitions variable will be null in two cases as follows
+        // 1. the table is not a partition table
+        // 2. the table is a partition table, and all partitions are matched 
by query
+        // for partition table, the task id of carbaondata file name is the 
partition id.
+        // if this partition is not required, here will skip it.
+        if (matchedPartitions == null || 
matchedPartitions.get(partitionIndex)) {
+          resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet));
+        }
       }
     }
     statistic

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8269757..0dc79fa 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -85,6 +85,14 @@ public class CarbonInputFormatUtil {
     return carbonInputFormat;
   }
 
+  public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat(
+      AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) 
throws IOException {
+    CarbonTableInputFormat<V> carbonTableInputFormat = new 
CarbonTableInputFormat<>();
+    carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), 
partitionId);
+    FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    return carbonTableInputFormat;
+  }
+
   private static void addQueryMeasure(CarbonQueryPlan plan, int order, 
CarbonMeasure measure) {
     QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
     queryMeasure.setQueryOrder(order);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common-test/src/test/resources/partition_data.csv
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/resources/partition_data.csv 
b/integration/spark-common-test/src/test/resources/partition_data.csv
new file mode 100644
index 0000000..ec20d92
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/partition_data.csv
@@ -0,0 +1,28 @@
+id,vin,logdate,phonenumber,country,area,salary
+1,A42158424831,2016/02/12,125371341,China,Asia,10000
+2,A42158473831,2016/01/12,125371342,China,Asia,10001
+3,A42152474832,2016/02/12,125371343,US,America,10002
+4,A42151477823,2016/12/12,125371344,China,OutSpace,10003
+5,A42158474135,2016/02/15,125371345,Japan,OutSpace,10004
+6,A42258434831,2016/12/12,125371346,China,Asia,10005
+7,A42158475831,2016/05/12,125371347,UK,OutSpace,10006
+8,A41158494830,2015/07/12,225371348,China,Asia,10007
+9,A42158424831,2015/02/12,225371349,China,OutSpace,10008
+10,A42158473830,2014/01/12,225371310,China,Asia,10009
+11,A42152474830,2013/02/12,325371311,US,America,10010
+12,A42151477823,2012/12/12,425371312,China,Asia,10011
+13,A42158474133,2012/02/15,325371313,Japan,Asia,10012
+14,A42258434835,2013/12/12,525371314,China,Asia,10013
+15,A42158475836,2014/05/12,625371315,UK,OutSpace,10014
+16,A41158494838,2015/07/12,525371316,China,Asia,10015
+17,A42158424833,2016/02/12,425371317,China,Asia,10016
+18,A42158473832,2017/01/12,325371318,China,Asia,10017
+19,A42152474834,2011/02/12,225371319,US,America,10018
+20,A42151477824,2012/12/12,225371320,China,Asia,10019
+21,A42158474137,2013/02/15,325371321,Japan,Asia,10020
+22,A42258434837,2014/12/12,25371322,China,Asia,10021
+23,A42158475838,2014/05/12,425371323,UK,OutSpace,10022
+24,A41158494839,2016/07/12,625371324,China,Asia,10023
+25,A41158494840,2016/07/12,626381324,Good,OutSpace,10024
+26,A41158494843,2016/07/12,625378824,NotGood,OutSpace,10025
+27,A41158494999,2016/07/12,625378824,Other,,10026

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index f7ac87c..39777de 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -59,11 +59,11 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
-  def validateDataFiles(tableUniqueName: String, sgementId: String, 
partitions: Seq[Int]): Unit = {
+  def validateDataFiles(tableUniqueName: String, segmentId: String, 
partitions: Seq[Int]): Unit = {
     val carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getStorePath, 
carbonTable.getDatabaseName,
       carbonTable.getFactTableName)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", sgementId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, 
FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 905f977..24c09ca 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -209,12 +209,12 @@ public final class CarbonLoaderUtil {
    * @param loadModel
    */
   public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel 
loadModel,
-      boolean isCompactionFlow) {
+      boolean isCompactionFlow, boolean isAltPartitionFlow) {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     String tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(databaseName, tableName, 
loadModel.getSegmentId(),
-            loadModel.getTaskNo(), isCompactionFlow);
+            loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow);
     // form local store location
     final String localStoreLocations = 
CarbonProperties.getInstance().getProperty(tempLocationKey);
     if (localStoreLocations == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java
new file mode 100644
index 0000000..3a37a9d
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.partition;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.PartitionSplitter;
+
+import org.apache.spark.sql.execution.command.SplitPartitionCallableModel;
+
+/**
+ * Callable class which is used to split the partition in a separate callable.
+ */
+public class SplitPartitionCallable implements Callable<Void> {
+
+  private final SplitPartitionCallableModel splitPartitionCallableModel;
+
+  public SplitPartitionCallable(SplitPartitionCallableModel 
splitPartitionCallableModel) {
+    this.splitPartitionCallableModel = splitPartitionCallableModel;
+  }
+
+  @Override public Void call() {
+    PartitionSplitter.triggerPartitionSplit(splitPartitionCallableModel);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 31dd4e6..181f6e4 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -107,6 +107,14 @@ class MergeResultImpl extends MergeResult[String, Boolean] 
{
   override def getKey(key: String, value: Boolean): (String, Boolean) = (key, 
value)
 }
 
+trait SplitResult[K, V] extends Serializable {
+  def getKey(key: String, value: Boolean): (K, V)
+}
+
+class SplitResultImpl extends SplitResult[String, Boolean] {
+  override def getKey(key: String, value: Boolean): (String, Boolean) = (key, 
value)
+}
+
 trait DeletedLoadResult[K, V] extends Serializable {
   def getKey(key: String, value: String): (K, V)
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 4d1267a..e43d204 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -204,7 +204,7 @@ object DataLoadProcessorStepOnSpark {
         dataWriter.close()
       }
       // clean up the folders and files created locally for data load operation
-      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+      CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
new file mode 100644
index 0000000..e481fc4
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.spliter.RowResultSpliterProcessor
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.SplitResult
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+class AlterTableSplitPartitionRDD[K, V](
+    sc: SparkContext,
+    result: SplitResult[K, V],
+    partitionIds: Seq[String],
+    segmentId: String,
+    bucketId: Int,
+    carbonLoadModel: CarbonLoadModel,
+    identifier: AbsoluteTableIdentifier,
+    storePath: String,
+    oldPartitionIdList: List[Int],
+    prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+
+    sc.setLocalProperty("spark.scheduler.pool", "DDL")
+    sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+
+    var storeLocation: String = null
+    var splitResult: String = null
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val databaseName = carbonTable.getDatabaseName
+    val factTableName = carbonTable.getFactTableName
+    val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+
+    override protected def getPartitions: Array[Partition] = 
firstParent[Array[AnyRef]].partitions
+
+    override def compute(split: Partition, context: TaskContext): Iterator[(K, 
V)] = {
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+        val rows = firstParent[Array[AnyRef]].iterator(split, 
context).toList.asJava
+        val iter = new Iterator[(K, V)] {
+            val partitionId = partitionInfo.getPartitionId(split.index)
+            carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+            carbonLoadModel.setSegmentId(segmentId)
+            carbonLoadModel.setPartitionId("0")
+            val tempLocationKey = CarbonDataProcessorUtil
+              .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+                  carbonLoadModel.getTableName,
+                  segmentId,
+                  carbonLoadModel.getTaskNo,
+                  false,
+                  true)
+            // this property is used to determine whether temp location for 
carbon is inside
+            // container temp dir or is yarn application directory.
+            val carbonUseLocalDir = CarbonProperties.getInstance()
+              .getProperty("carbon.use.local.dir", "false")
+
+            if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+                val storeLocations = 
CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+                if (null != storeLocations && storeLocations.nonEmpty) {
+                    storeLocation = 
storeLocations(Random.nextInt(storeLocations.length))
+                }
+                if (storeLocation == null) {
+                    storeLocation = System.getProperty("java.io.tmpdir")
+                }
+            } else {
+                storeLocation = System.getProperty("java.io.tmpdir")
+            }
+            storeLocation = storeLocation + '/' + System.nanoTime() + '/' + 
split.index
+            CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation)
+            LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+
+            val tempStoreLoc = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+                factTableName,
+                carbonLoadModel.getTaskNo,
+                "0",
+                segmentId,
+                false,
+                true
+            )
+
+            val splitStatus = if (rows.isEmpty) {
+                LOGGER.info("After repartition this split, NO target rows to 
write back.")
+                true
+            } else {
+                try {
+                    val segmentProperties = 
PartitionUtils.getSegmentProperties(identifier,
+                        segmentId, partitionIds.toList, oldPartitionIdList, 
partitionInfo)
+                    val processor = new RowResultSpliterProcessor(
+                        carbonTable,
+                        carbonLoadModel,
+                        segmentProperties,
+                        tempStoreLoc,
+                        bucketId
+                    )
+                    processor.execute(rows)
+                } catch {
+                    case e: Exception =>
+                        sys.error(s"Exception when executing Row result 
processor ${e.getMessage}")
+                } finally {
+                    CarbonLoaderUtil
+                      .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
false, true)
+                }
+
+            }
+
+            val splitResult = segmentId
+            var finished = false
+
+            override def hasNext: Boolean = {
+                !finished
+            }
+
+            override def next(): (K, V) = {
+                finished = true
+                result.getKey(splitResult, splitStatus)
+            }
+        }
+        iter
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index add0578..07264b5 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -153,7 +153,8 @@ class CarbonMergerRDD[K, V](
             carbonLoadModel.getTableName,
             carbonLoadModel.getSegmentId,
             carbonLoadModel.getTaskNo,
-            true)
+            true,
+            false)
         CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation)
         LOGGER.info(s"Temp storeLocation taken is $storeLocation")
         // get destination segment properties as sent from driver which is of 
last segment.
@@ -196,7 +197,8 @@ class CarbonMergerRDD[K, V](
           carbonLoadModel.getTaskNo,
           "0",
           mergeNumber,
-          true
+          true,
+          false
         )
 
         carbonLoadModel.setPartitionId("0")
@@ -231,7 +233,7 @@ class CarbonMergerRDD[K, V](
         try {
           val isCompactionFlow = true
           CarbonLoaderUtil
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
isCompactionFlow)
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
isCompactionFlow, false)
         } catch {
           case e: Exception =>
             LOGGER.error(e)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
new file mode 100644
index 0000000..2a39db5
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util.ArrayList
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.block.{Distributable, 
SegmentProperties, TaskBlockInfo}
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import 
org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, 
CarbonMeasure}
+import 
org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
+import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil}
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.merger.CarbonCompactionUtil
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.spliter.CarbonSplitExecutor
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+
+/**
+ * This RDD is used in alter table partition statement to get data of target 
partitions,
+ * then repartition data according to new partitionInfo
+ * @param sc
+ * @param partitionIds  the ids of target partition to be scanned
+ * @param storePath
+ * @param segmentId
+ * @param bucketId
+ * @param oldPartitionIdList  the taskId in partition order before 
partitionInfo is modified
+ * @param carbonTableIdentifier
+ * @param carbonLoadModel
+ */
+class CarbonScanPartitionRDD(
+    sc: SparkContext,
+    partitionIds: Seq[String],
+    storePath: String,
+    segmentId: String,
+    bucketId: Int,
+    oldPartitionIdList: List[Int],
+    carbonTableIdentifier: CarbonTableIdentifier,
+    carbonLoadModel: CarbonLoadModel)
+  extends RDD[(AnyRef, Array[AnyRef])](sc, Nil) {
+
+  private val queryId = sc.getConf.get("queryId", System.nanoTime() + "")
+  val identifier = new AbsoluteTableIdentifier(storePath, 
carbonTableIdentifier)
+  var storeLocation: String = null
+  var splitStatus: Boolean = false
+  var blockId: String = null
+  val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+  val dimensions = carbonTable.getAllDimensions.asScala
+  val measures = carbonTable.getAllMeasures.asScala
+  val partitionInfo = 
carbonTable.getPartitionInfo(carbonTableIdentifier.getTableName)
+  val partitionColumn = partitionInfo.getColumnSchemaList().get(0)
+  val partitionDataType = partitionColumn.getDataType
+  val partitionColumnName = partitionColumn.getColumnName
+  var isDimension: Boolean = false
+  val encodingList = partitionColumn.getEncodingList
+  var dimension: CarbonDimension = null
+  var measure: CarbonMeasure = null
+  val noDictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
+  val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
+  val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]()
+
+  override def getPartitions: Array[Partition] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val parallelism = sparkContext.defaultParallelism
+    val jobConf = new JobConf(new Configuration)
+    val job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonTableInputFormat(identifier,
+      partitionIds.toList.asJava, job)
+    job.getConfiguration.set("query.id", queryId)
+
+    val splits = format.getSplitsOfOneSegment(job, segmentId,
+      oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
+    var partition_num = 0
+    val result = new ArrayList[Partition](parallelism)
+    val blockList = splits.asScala
+      .filter(_.asInstanceOf[CarbonInputSplit].getBucketId.toInt == bucketId)
+      .map(_.asInstanceOf[Distributable])
+    if (!blockList.isEmpty) {
+      val activeNodes = 
DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
+      val nodeBlockMapping = 
CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
+        parallelism, activeNodes.toList.asJava)
+      nodeBlockMapping.asScala.foreach { case (node, blockList) =>
+        blockList.asScala.foreach { blocksPerTask =>
+          val splits = 
blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
+          if (blocksPerTask.size() != 0) {
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node))
+            val partition = new CarbonSparkPartition(id, partition_num, 
multiBlockSplit)
+            result.add(partition)
+            partition_num += 1
+          }
+        }
+      }
+    }
+    result.toArray(new Array[Partition](result.size()))
+  }
+
+  override def compute(split: Partition, context: TaskContext):
+    Iterator[(AnyRef, Array[AnyRef])] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      var exec : CarbonSplitExecutor = null
+      val rows : java.util.List[(AnyRef, Array[AnyRef])] = new 
ArrayList[(AnyRef, Array[AnyRef])]()
+      try {
+        val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+        val splits = inputSplit.getAllSplits.asScala
+        val tableBlockInfoList = CarbonInputSplit.createBlocks(splits.asJava)
+        val segmentMapping: java.util.Map[String, TaskBlockInfo] =
+          CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
+        val carbonTable = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+        var result : java.util.List[PartitionSpliterRawResultIterator] = null
+        try {
+          exec = new CarbonSplitExecutor(segmentMapping, carbonTable)
+          result = exec.processDataBlocks(segmentId)
+        } catch {
+          case e: Throwable =>
+            LOGGER.error(e)
+            if (null != e.getMessage) {
+              sys.error(s"Exception occurred in query execution :: ${ 
e.getMessage }")
+            } else {
+              sys.error("Exception occurred in query execution. Please check 
logs.")
+            }
+        }
+        val segmentProperties = 
PartitionUtils.getSegmentProperties(identifier, segmentId,
+          partitionIds.toList, oldPartitionIdList, partitionInfo)
+        val partColIdx = getPartitionColumnIndex(partitionColumnName, 
segmentProperties)
+        indexInitialise()
+        for (iterator <- result.asScala) {
+          while (iterator.hasNext) {
+            val row = iterator.next()
+            val partitionColumnValue = getPartitionColumnValue(row, partColIdx,
+              segmentProperties)
+            rows.add((partitionColumnValue, row))
+          }
+        }
+      } catch {
+        case e: Exception =>
+          LOGGER.error(e)
+          throw e
+      } finally {
+        if (null != exec) {
+          exec.finish
+        }
+      }
+    val iter = rows.iterator().asScala
+    iter
+  }
+
+  def getPartitionColumnValue(row: Array[AnyRef], partColIdx: Int,
+      segmentProperties: SegmentProperties): AnyRef = {
+    val dims: Array[Byte] = 
row(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey
+    val keyGen = segmentProperties.getDimensionKeyGenerator
+    val keyArray: Array[Long] = keyGen.getKeyArray(dims)
+    val encodings = partitionColumn.getEncodingList
+    val partitionType = partitionInfo.getPartitionType
+    var partitionValue: AnyRef = null
+    val factor = 1000L
+    if (isDimension) {
+      // direct dictionary
+      if (encodings.contains(Encoding.DIRECT_DICTIONARY)) {
+        val directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(partitionDataType)
+        val dictionaryIndex = dictionaryIndexGroup.indexOf(partColIdx)
+        val surrogateValue = (keyArray(dictionaryIndex) / factor).toInt
+        partitionValue = 
directDictionaryGenerator.getValueFromSurrogate(surrogateValue)
+      } else if (!encodings.contains(Encoding.DICTIONARY)) {
+        // no dictionary
+        val byteArray = 
row(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeys
+        val index = noDictionaryIndexGroup.indexOf(partColIdx)
+        partitionValue = 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(byteArray(index)
+          , partitionDataType)
+        if (partitionValue.isInstanceOf[UTF8String]) {
+          partitionValue = partitionValue.toString
+        }
+      } else {  // normal dictionary
+        val dict = CarbonLoaderUtil.getDictionary(carbonTableIdentifier,
+          dimension.getColumnIdentifier, storePath, partitionDataType)
+        if (partitionDataType == DataType.STRING) {
+          if (partitionType == PartitionType.RANGE) {
+            partitionValue = ByteUtil.
+              
toBytes(dict.getDictionaryValueForKey(keyArray(partColIdx).toInt))
+          } else {
+            partitionValue = 
dict.getDictionaryValueForKey(keyArray(partColIdx).toInt)
+          }
+        } else {
+          partitionValue = 
dict.getDictionaryValueForKey(keyArray(partColIdx).toInt)
+        }
+
+      }
+    } else {
+      partitionValue = row(measureIndexGroup(partColIdx))
+    }
+    partitionValue
+  }
+
+  def indexInitialise(): Unit = {
+      for (dim: CarbonDimension <- dimensions) {
+        if (!dim.getEncoder.contains(Encoding.DICTIONARY)) {
+          noDictionaryIndexGroup.append(dimensions.indexOf(dim))
+        } else {
+          dictionaryIndexGroup.append(dimensions.indexOf(dim))
+        }
+      }
+      for (msr: CarbonMeasure <- measures) {
+        // index of measure in row
+        measureIndexGroup.append(measures.indexOf(msr) + 1)
+      }
+  }
+
+  /**
+   * get the index of partition column in dimension/measure
+   * @param partitionColumnName
+   * @param segmentProperties
+   * @return
+   */
+  def getPartitionColumnIndex(partitionColumnName: String,
+      segmentProperties: SegmentProperties): Int = {
+    val dimensions = segmentProperties.getDimensions
+    val measures = segmentProperties.getMeasures
+    val columns = dimensions.asScala.map(_.getColName) ++ 
measures.asScala.map(_.getColName)
+    var index = 0
+    for (i <- 0 until columns.size) {
+      if (columns(i) == partitionColumnName) {
+        index = i
+      }
+    }
+    if (index < dimensions.size()) {
+      isDimension = true
+      dimension = dimensions.get(index)
+    } else {
+      index = index - dimensions.size()
+      measure = measures.get(index)
+    }
+    index
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index a6a8835..bca119e 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, 
CompactionModel}
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, 
CompactionModel, SplitPartitionCallableModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -39,6 +39,7 @@ import 
org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.compaction.CompactionCallable
 import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.partition.SplitPartitionCallable
 import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
 
 /**
@@ -260,6 +261,50 @@ object DataManagementFunc {
     futureList.add(future)
   }
 
+  def executePartitionSplit( sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      storePath: String,
+      segment: String,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+      CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
+    )
+    scanSegmentsForSplitPartition(futureList, executor, storePath, segment, 
partitionId,
+      sqlContext, carbonLoadModel, oldPartitionIdList)
+    try {
+        futureList.asScala.foreach(future => {
+          future.get
+        }
+      )
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage 
}")
+        throw e
+    }
+  }
+
+  private def scanSegmentsForSplitPartition(futureList: 
util.List[Future[Void]],
+      executor: ExecutorService,
+      storePath: String,
+      segmentId: String,
+      partitionId: String,
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      oldPartitionIdList: List[Int]): Unit = {
+
+    val splitModel = SplitPartitionCallableModel(storePath,
+      carbonLoadModel,
+      segmentId,
+      partitionId,
+      oldPartitionIdList,
+      sqlContext)
+
+    val future: Future[Void] = executor.submit(new 
SplitPartitionCallable(splitModel))
+    futureList.add(future)
+  }
+
   def prepareCarbonLoadModel(storePath: String,
       table: CarbonTable,
       newCarbonLoadModel: CarbonLoadModel): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index b51800e..37b2d02 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -267,7 +267,7 @@ class NewCarbonDataLoadRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load 
operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several 
times.
         // So print the data load statistics only in case of non failure case
         if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
@@ -453,7 +453,7 @@ class NewDataFrameLoaderRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load 
operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several 
times.
         // So print the data load statistics only in case of non failure case
         if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
@@ -638,7 +638,7 @@ class PartitionTableDataLoaderRDD[K, V](
           throw e
       } finally {
         // clean up the folders and files created locally for data load 
operation
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several 
times.
         // So print the data load statistics only in case of non failure case
         if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
new file mode 100644
index 0000000..48e1bee
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.IOException
+
+import org.apache.spark.sql.execution.command.SplitPartitionCallableModel
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.{PartitionFactory, SplitResultImpl}
+
+object PartitionSplitter {
+
+  val logger = 
LogServiceFactory.getLogService(PartitionSplitter.getClass.getName)
+
+  def triggerPartitionSplit(splitPartitionCallableModel: 
SplitPartitionCallableModel): Unit = {
+     val sc = splitPartitionCallableModel.sqlContext.sparkContext
+     val partitionId = splitPartitionCallableModel.partitionId
+     val storePath = splitPartitionCallableModel.storePath
+     val segmentId = splitPartitionCallableModel.segmentId
+     val oldPartitionIdList = splitPartitionCallableModel.oldPartitionIdList
+     val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
+     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+     val identifier = carbonTable.getAbsoluteTableIdentifier
+     val carbonTableIdentifier = identifier.getCarbonTableIdentifier
+     val tableName = carbonTable.getFactTableName
+     val databaseName = carbonTable.getDatabaseName
+     val bucketInfo = carbonTable.getBucketingInfo(tableName)
+     var finalSplitStatus = false
+     val bucketNumber = bucketInfo match {
+       case null => 1
+       case _ => bucketInfo.getNumberOfBuckets
+     }
+     val partitionInfo = carbonTable.getPartitionInfo(tableName)
+     val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+
+     for (i <- 0 until bucketNumber) {
+       val bucketId = i
+       val rdd = new CarbonScanPartitionRDD(
+         sc,
+         Seq(partitionId),
+         storePath,
+         segmentId,
+         bucketId,
+         oldPartitionIdList,
+         carbonTableIdentifier,
+         carbonLoadModel
+       ).partitionBy(partitioner).map(_._2)
+
+       val splitStatus = new AlterTableSplitPartitionRDD(sc,
+         new SplitResultImpl(),
+         Seq(partitionId),
+         segmentId,
+         bucketId,
+         carbonLoadModel,
+         identifier,
+         storePath,
+         oldPartitionIdList,
+         rdd).collect()
+
+       if (splitStatus.length == 0) {
+         finalSplitStatus = false
+       } else {
+         finalSplitStatus = splitStatus.forall(_._2)
+       }
+       if (!finalSplitStatus) {
+         logger.audit(s"Add/Split Partition request failed for table " +
+                      s"${ databaseName }.${ tableName }")
+         logger.error(s"Add/Split Partition request failed for table " +
+                      s"${ databaseName }.${ tableName }")
+       }
+     }
+     if (finalSplitStatus) {
+       try {
+         PartitionUtils.
+           deleteOriginalCarbonFile(identifier, segmentId, 
Seq(partitionId).toList,
+             oldPartitionIdList, storePath, databaseName, tableName, 
partitionInfo, carbonLoadModel)
+       } catch {
+         case e: IOException => sys.error(s"Exception while delete original 
carbon files " +
+         e.getMessage)
+       }
+       logger.audit(s"Add/Split Partition request completed for table " +
+                    s"${ databaseName }.${ tableName }")
+       logger.info(s"Add/Split Partition request completed for table " +
+                   s"${ databaseName }.${ tableName }")
+     }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 705daea..4d781a1 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -292,11 +292,13 @@ object CommonUtil {
         value.matches(pattern)
       case "timestamp" =>
         val timeStampFormat = new 
SimpleDateFormat(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
         scala.util.Try(timeStampFormat.parse(value)).isSuccess
       case "date" =>
         val dateFormat = new SimpleDateFormat(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT))
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
         scala.util.Try(dateFormat.parse(value)).isSuccess
       case _ =>
         validateTypeConvertForSpark2(partitionerField, value)
@@ -333,6 +335,40 @@ object CommonUtil {
     }
   }
 
+  def validateSplitListInfo(originListInfo: List[String], newListInfo: 
List[String],
+      originList: List[List[String]]): Unit = {
+    if (originListInfo.size == 1) {
+      sys.error("The target list partition cannot be split, please check 
again!")
+    }
+    if (newListInfo.size == 1) {
+      sys.error("Can't split list to one partition, please check again!")
+    }
+    if (!(newListInfo.size < originListInfo.size)) {
+      sys.error("The size of new list must be smaller than original list, 
please check again!")
+    }
+    val tempList = newListInfo.mkString(",").split(",")
+      .map(_.trim.replace("(", "").replace(")", ""))
+    if (tempList.length != originListInfo.size) {
+      sys.error("The total number of elements in new list must equal to 
original list!")
+    }
+    if (!originListInfo.sameElements(tempList)) {
+      sys.error("The elements in new list must exist in original list")
+    }
+  }
+
+  def validateAddListInfo(newListInfo: List[String], originList: 
List[List[String]]): Unit = {
+    if (newListInfo.size < 1) {
+      sys.error("Please add at least one new partition")
+    }
+    for (originElementGroup <- originList) {
+      for (newElement <- newListInfo ) {
+        if (originElementGroup.contains(newElement)) {
+          sys.error(s"The partition $newElement is already exist! Please check 
again!")
+        }
+      }
+    }
+  }
+
   def validateFields(key: String, fields: Seq[Field]): Boolean = {
     var isValid: Boolean = false
     fields.foreach { field =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 500e18e..70a2498 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -126,6 +126,13 @@ case class CompactionCallableModel(storePath: String,
     sqlContext: SQLContext,
     compactionType: CompactionType)
 
+case class SplitPartitionCallableModel(storePath: String,
+    carbonLoadModel: CarbonLoadModel,
+    segmentId: String,
+    partitionId: String,
+    oldPartitionIdList: List[Int],
+    sqlContext: SQLContext)
+
 case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
 
 case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo,
@@ -151,6 +158,15 @@ case class AlterTableDropColumnModel(databaseName: 
Option[String],
     tableName: String,
     columns: List[String])
 
+case class AlterTableDropPartitionModel(databaseName: Option[String],
+    tableName: String,
+    partitionId: String)
+
+case class AlterTableSplitPartitionModel(databaseName: Option[String],
+    tableName: String,
+    partitionId: String,
+    splitInfo: List[String])
+
 class AlterTableColumnSchemaGenerator(
     alterTableModel: AlterTableAddColumnsModel,
     dbName: String,

Reply via email to