Repository: carbondata
Updated Branches:
  refs/heads/master b944bd8a1 -> 6488bc018


[CARBONDATA-1250] Change default partition id & Add PartitionIdList in 
partitionInfo

This closes #1125


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6488bc01
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6488bc01
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6488bc01

Branch: refs/heads/master
Commit: 6488bc018a2ec715b31407d12290680d388a43b3
Parents: b944bd8
Author: lionelcao <whuca...@gmail.com>
Authored: Fri Jun 30 00:00:32 2017 +0800
Committer: chenliang613 <chenliang...@apache.org>
Committed: Fri Jul 14 10:11:10 2017 +0800

----------------------------------------------------------------------
 .../ThriftWrapperSchemaConverterImpl.java       |   6 +
 .../core/metadata/schema/PartitionInfo.java     |  59 ++++++++--
 .../schema/partition/AbstractPartition.java     |  42 -------
 .../schema/partition/HashPartition.java         |  34 ------
 .../schema/partition/ListPartition.java         |  36 ------
 .../schema/partition/RangePartition.java        |  40 -------
 .../core/metadata/schema/table/CarbonTable.java |   9 --
 .../scan/filter/FilterExpressionProcessor.java  |   5 +-
 .../core/scan/filter/FilterProcessor.java       |   4 +-
 .../filter/partition/PartitionFilterUtil.java   | 112 ++++---------------
 .../core/scan/partition/ListPartitioner.java    |   8 +-
 .../core/scan/partition/RangePartitioner.java   |  76 ++-----------
 .../core/stats/PartitionStatistic.java          |  36 ------
 .../core/util/comparator/Comparator.java        | 105 +++++++++++++++++
 .../util/comparator/SerializableComparator.java |  24 ++++
 format/src/main/thrift/schema.thrift            |   8 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |  66 +++++++----
 .../TestDataLoadingForPartitionTable.scala      |  12 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   4 +-
 .../carbondata/spark/util/CommonUtil.scala      |  44 ++++++--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  47 ++++----
 .../org/apache/spark/util/PartitionUtils.scala  |  52 +++++++++
 .../processing/model/CarbonLoadModel.java       |   2 +-
 23 files changed, 390 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 2d5f395..235a7ba 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -219,6 +219,9 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     externalPartitionInfo.setList_info(wrapperPartitionInfo.getListInfo());
     externalPartitionInfo.setRange_info(wrapperPartitionInfo.getRangeInfo());
     
externalPartitionInfo.setNum_partitions(wrapperPartitionInfo.getNumPartitions());
+    
externalPartitionInfo.setMax_partition(wrapperPartitionInfo.getMAX_PARTITION());
+    externalPartitionInfo.setPartition_ids(wrapperPartitionInfo
+        .getPartitionIds());
     return externalPartitionInfo;
   }
 
@@ -453,6 +456,9 @@ public class ThriftWrapperSchemaConverterImpl implements 
SchemaConverter {
     wrapperPartitionInfo.setListInfo(externalPartitionInfo.getList_info());
     wrapperPartitionInfo.setRangeInfo(externalPartitionInfo.getRange_info());
     
wrapperPartitionInfo.setNumPartitions(externalPartitionInfo.getNum_partitions());
+    wrapperPartitionInfo.setPartitionIds(externalPartitionInfo
+        .getPartition_ids());
+    
wrapperPartitionInfo.setMAX_PARTITION(externalPartitionInfo.getMax_partition());
     return wrapperPartitionInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 2b08536..0d03998 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.metadata.schema;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
@@ -43,13 +44,25 @@ public class PartitionInfo implements Serializable {
   private List<List<String>> listInfo;
 
   /**
-   * number of partitions
+   * total count of partitions
    */
   private int numPartitions;
 
+  /**
+   * current max partition id, increase only, will be used in alter table 
partition operation
+   */
+  private int MAX_PARTITION;
+
+  /**
+   * record the partitionId in the logical ascending order
+   * initiate when table created and changed when alter table
+   */
+  private List<Integer> partitionIds;
+
   public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType 
partitionType) {
     this.columnSchemaList = columnSchemaList;
     this.partitionType = partitionType;
+    this.partitionIds = new ArrayList<>();
   }
 
   public List<ColumnSchema> getColumnSchemaList() {
@@ -64,14 +77,6 @@ public class PartitionInfo implements Serializable {
     return partitionType;
   }
 
-  public void setNumPartitions(int numPartitions) {
-    this.numPartitions = numPartitions;
-  }
-
-  public int getNumPartitions() {
-    return numPartitions;
-  }
-
   public void setRangeInfo(List<String> rangeInfo) {
     this.rangeInfo = rangeInfo;
   }
@@ -88,4 +93,40 @@ public class PartitionInfo implements Serializable {
     return listInfo;
   }
 
+  public void initialize(int partitionNum) {
+    for (int i = 0; i < partitionNum; i++) {
+      partitionIds.add(i);
+    }
+    MAX_PARTITION = partitionNum - 1;
+    numPartitions = partitionNum;
+  }
+
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  public int getMAX_PARTITION() {
+    return MAX_PARTITION;
+  }
+
+  public void setMAX_PARTITION(int max_partition) {
+    this.MAX_PARTITION = max_partition;
+  }
+
+  public List<Integer> getPartitionIds() {
+    return partitionIds;
+  }
+
+  public void setPartitionIds(List<Integer> partitionIdList) {
+    this.partitionIds = partitionIdList;
+  }
+
+  public int getPartitionId(int index) {
+    return partitionIds.get(index);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java
deleted file mode 100644
index 1d6337c..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata.schema.partition;
-
-public abstract class AbstractPartition {
-
-  /**
-   * Partition unique identifier
-   */
-  protected int partitionId;
-
-  /**
-   * Total row count of this partition
-   */
-  protected int rowCount;
-
-  public int getPartitionId() {
-    return partitionId;
-  }
-
-  public void setRowCount(int count) {
-    this.rowCount = count;
-  }
-
-  public int getRowCount() {
-    return rowCount;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java
deleted file mode 100644
index 6b43525..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata.schema.partition;
-
-public class HashPartition extends AbstractPartition {
-
-  /**
-   * hash value for hash partition table
-   */
-  private int hashValue;
-
-  public HashPartition(int id, int hashValue) {
-    this.partitionId = id;
-    this.hashValue = hashValue;
-  }
-
-  public int getHashValue() {
-    return hashValue;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java
deleted file mode 100644
index 11a396f..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata.schema.partition;
-
-import java.util.List;
-
-public class ListPartition extends AbstractPartition {
-
-  /**
-   * value list for list partition table
-   */
-  private List<String> listInfo;
-
-  public ListPartition(int id, List<String> listInfo) {
-    this.partitionId = id;
-    this.listInfo = listInfo;
-  }
-
-  public List<String> getListInfo() {
-    return listInfo;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java
deleted file mode 100644
index af2b05d..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata.schema.partition;
-
-public class RangePartition extends AbstractPartition {
-  /**
-   * boundary value for range partition table
-   */
-  private String lowerBoundary;
-
-  private String upperBoundary;
-
-  public RangePartition(int id, String lowerBoundary, String upperBoundary) {
-    this.partitionId = id;
-    this.lowerBoundary = lowerBoundary;
-    this.upperBoundary = upperBoundary;
-  }
-
-  public String getLowerBoundary() {
-    return lowerBoundary;
-  }
-
-  public String getUpperBoundary() {
-    return upperBoundary;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 16ded57..cec8007 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -33,7 +33,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.stats.PartitionStatistic;
 
 /**
  * Mapping class for Carbon actual table
@@ -99,10 +98,6 @@ public class CarbonTable implements Serializable {
   private Map<String, PartitionInfo> tablePartitionMap;
 
   /**
-   * statistic information of partition table
-   */
-  private PartitionStatistic partitionStatistic;
-  /**
    * tableUniqueName
    */
   private String tableUniqueName;
@@ -597,10 +592,6 @@ public class CarbonTable implements Serializable {
     return null != tablePartitionMap.get(getFactTableName());
   }
 
-  public PartitionStatistic getPartitionStatistic() {
-    return partitionStatistic;
-  }
-
   /**
    * @return absolute table identifier
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index ce31283..21c7bf6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -63,6 +63,7 @@ import 
org.apache.carbondata.core.scan.filter.resolver.LogicalFilterResolverImpl
 import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl;
 import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl;
+import org.apache.carbondata.core.scan.partition.PartitionUtil;
 import org.apache.carbondata.core.scan.partition.Partitioner;
 
 public class FilterExpressionProcessor implements FilterProcessor {
@@ -163,11 +164,11 @@ public class FilterExpressionProcessor implements 
FilterProcessor {
    * The value of "1" in BitSet represent the required partition
    * @param expressionTree
    * @param partitionInfo
-   * @param partitioner
    * @return
    */
   @Override public BitSet getFilteredPartitions(Expression expressionTree,
-      PartitionInfo partitionInfo, Partitioner partitioner) {
+      PartitionInfo partitionInfo) {
+    Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo);
     return createPartitionFilterTree(expressionTree, 
partitionInfo).applyFilter(partitioner);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
index 9f5f7f1..246166d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java
@@ -28,7 +28,6 @@ import 
org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.scan.expression.Expression;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.partition.Partitioner;
 
 public interface FilterProcessor {
 
@@ -59,6 +58,5 @@ public interface FilterProcessor {
    * This API will get the map of required partitions.
    * @return BitSet the value "1" represent the required partition.
    */
-  BitSet getFilteredPartitions(Expression expressionTree, PartitionInfo 
partitionInfo,
-      Partitioner partitioner);
+  BitSet getFilteredPartitions(Expression expressionTree, PartitionInfo 
partitionInfo);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
index 1ab2ae6..efb8bdb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java
@@ -17,10 +17,8 @@
 
 package org.apache.carbondata.core.scan.filter.partition;
 
-import java.math.BigDecimal;
 import java.text.DateFormat;
 import java.util.BitSet;
-import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -28,86 +26,12 @@ import 
org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.scan.partition.ListPartitioner;
 import org.apache.carbondata.core.scan.partition.PartitionUtil;
 import org.apache.carbondata.core.scan.partition.RangePartitioner;
-import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 public class PartitionFilterUtil {
 
   /**
-   * create Comparator for range filter
-   * @param dataType
-   * @return
-   */
-  public static Comparator getComparatorByDataType(DataType dataType) {
-    switch (dataType) {
-      case INT:
-        return new IntComparator();
-      case SHORT:
-        return new ShortComparator();
-      case DOUBLE:
-        return new DoubleComparator();
-      case LONG:
-      case DATE:
-      case TIMESTAMP:
-        return new LongComparator();
-      case DECIMAL:
-        return new BigDecimalComparator();
-      default:
-        return new ByteArrayComparator();
-    }
-  }
-
-  static class ByteArrayComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      return ByteUtil.compare((byte[]) key1, (byte[]) key2);
-    }
-  }
-
-  static class IntComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      return (int) key1 - (int) key2;
-    }
-  }
-
-  static class ShortComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      return (short) key1 - (short) key2;
-    }
-  }
-
-  static class DoubleComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      double result = (double) key1 - (double) key2;
-      if (result < 0) {
-        return -1;
-      } else if (result > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-
-    }
-  }
-
-  static class LongComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      long result = (long) key1 - (long) key2;
-      if (result < 0) {
-        return -1;
-      } else if (result > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-    }
-  }
-
-  static class BigDecimalComparator implements Comparator<Object> {
-    @Override public int compare(Object key1, Object key2) {
-      return ((BigDecimal) key1).compareTo((BigDecimal) key2);
-    }
-  }
-
-  /**
    * get partition map of range filter on list partition table
    * @param partitionInfo
    * @param partitioner
@@ -123,12 +47,12 @@ public class PartitionFilterUtil {
     List<List<String>> values = partitionInfo.getListInfo();
     DataType partitionColumnDataType = 
partitionInfo.getColumnSchemaList().get(0).getDataType();
 
-    Comparator comparator =
-        PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType);
+    SerializableComparator comparator =
+        Comparator.getComparator(partitionColumnDataType);
 
     BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
     // add default partition
-    partitionMap.set(partitioner.numPartitions() - 1);
+    partitionMap.set(0);
 
     int partitions = values.size();
     if (isGreaterThan) {
@@ -140,7 +64,7 @@ public class PartitionFilterUtil {
             Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType,
                 timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) >= 0) {
-              partitionMap.set(i);
+              partitionMap.set(i + 1);
               continue outer1;
             }
           }
@@ -153,7 +77,7 @@ public class PartitionFilterUtil {
             Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType,
                 timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) > 0) {
-              partitionMap.set(i);
+              partitionMap.set(i + 1);
               continue outer2;
             }
           }
@@ -168,7 +92,7 @@ public class PartitionFilterUtil {
             Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType,
                 timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) <= 0) {
-              partitionMap.set(i);
+              partitionMap.set(i + 1);
               continue outer3;
             }
           }
@@ -181,7 +105,7 @@ public class PartitionFilterUtil {
             Object listValue = PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType,
                 timestampFormatter, dateFormatter);
             if (comparator.compare(listValue, filterValue) < 0) {
-              partitionMap.set(i);
+              partitionMap.set(i + 1);
               continue outer4;
             }
           }
@@ -208,8 +132,8 @@ public class PartitionFilterUtil {
     List<String> values = partitionInfo.getRangeInfo();
     DataType partitionColumnDataType = 
partitionInfo.getColumnSchemaList().get(0).getDataType();
 
-    Comparator comparator =
-        PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType);
+    SerializableComparator comparator =
+        Comparator.getComparator(partitionColumnDataType);
 
     BitSet partitionMap = 
PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false);
 
@@ -229,7 +153,7 @@ public class PartitionFilterUtil {
       // filter value is in default partition
       if (isGreaterThan) {
         // GreaterThan(>), GreaterThanEqualTo(>=)
-        partitionMap.set(numPartitions);
+        partitionMap.set(0);
       } else {
         // LessThan(<), LessThanEqualTo(<=)
         partitionMap.set(0, partitioner.numPartitions());
@@ -240,24 +164,26 @@ public class PartitionFilterUtil {
         // if result is 0, the filter value is a bound value of range 
partition.
         if (isGreaterThan) {
           // GreaterThan(>), GreaterThanEqualTo(>=)
-          partitionMap.set(partitionIndex + 1, partitioner.numPartitions());
+          partitionMap.set(partitionIndex + 2, partitioner.numPartitions());
+          partitionMap.set(0);
         } else {
           if (isEqualTo) {
             // LessThanEqualTo(<=)
-            partitionMap.set(0, partitionIndex + 2);
+            partitionMap.set(1, partitionIndex + 3);
           } else {
             // LessThan(<)
-            partitionMap.set(0, partitionIndex + 1);
+            partitionMap.set(1, partitionIndex + 2);
           }
         }
       } else {
         // the filter value is not a bound value of range partition
         if (isGreaterThan) {
           // GreaterThan(>), GreaterThanEqualTo(>=)
-          partitionMap.set(partitionIndex, partitioner.numPartitions());
+          partitionMap.set(partitionIndex + 1, partitioner.numPartitions());
+          partitionMap.set(0);
         } else {
           // LessThan(<), LessThanEqualTo(<=)
-          partitionMap.set(0, partitionIndex + 1);
+          partitionMap.set(1, partitionIndex + 2);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
index a2004bf..f6cc6b2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
@@ -40,7 +40,7 @@ public class ListPartitioner implements Partitioner {
           CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
 
   /**
-   * map the value of ListPartition to partition id.
+   * Map the value of ListPartition to partition id.
    */
   private Map<Object, Integer> map = new java.util.HashMap<Object, Integer>();
 
@@ -53,13 +53,13 @@ public class ListPartitioner implements Partitioner {
     for (int i = 0; i < numPartitions; i++) {
       for (String value : values.get(i)) {
         map.put(PartitionUtil.getDataBasedOnDataType(value, 
partitionColumnDataType,
-            timestampFormatter, dateFormatter), i);
+            timestampFormatter, dateFormatter), i + 1);
       }
     }
   }
 
   /**
-   * number of partitions
+   * Number of partitions
    * add extra default partition
    * @return
    */
@@ -70,7 +70,7 @@ public class ListPartitioner implements Partitioner {
   @Override public int getPartition(Object key) {
     Integer partition = map.get(key);
     if (partition == null) {
-      return numPartitions;
+      return 0;
     }
     return partition;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
index 3c02736..50f609a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.core.scan.partition;
 
-import java.io.Serializable;
-import java.math.BigDecimal;
 import java.text.SimpleDateFormat;
 import java.util.List;
 
@@ -27,6 +25,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.comparator.Comparator;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
 
 /**
  * Range Partitioner
@@ -60,28 +60,7 @@ public class RangePartitioner implements Partitioner {
             timestampFormatter, dateFormatter);
       }
     }
-
-    switch (partitionColumnDataType) {
-      case INT:
-        comparator = new IntSerializableComparator();
-        break;
-      case SHORT:
-        comparator = new ShortSerializableComparator();
-        break;
-      case DOUBLE:
-        comparator = new DoubleSerializableComparator();
-        break;
-      case LONG:
-      case DATE:
-      case TIMESTAMP:
-        comparator = new LongSerializableComparator();
-        break;
-      case DECIMAL:
-        comparator = new BigDecimalSerializableComparator();
-        break;
-      default:
-        comparator = new ByteArraySerializableComparator();
-    }
+    comparator = Comparator.getComparator(partitionColumnDataType);
   }
 
   /**
@@ -96,55 +75,14 @@ public class RangePartitioner implements Partitioner {
 
   @Override public int getPartition(Object key) {
     if (key == null) {
-      return numPartitions;
+      return 0;
     } else {
       for (int i = 0; i < numPartitions; i++) {
-        if (comparator.compareTo(key, bounds[i])) {
-          return i;
+        if (comparator.compare(key, bounds[i]) < 0) {
+          return i + 1;
         }
       }
-      return numPartitions;
-    }
-  }
-
-  interface SerializableComparator extends Serializable {
-    boolean compareTo(Object key1, Object key2);
-  }
-
-  class ByteArraySerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return ByteUtil.compare((byte[]) key1, (byte[]) key2) < 0;
-    }
-  }
-
-  class IntSerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return (int) key1 - (int) key2 < 0;
+      return 0;
     }
   }
-
-  class ShortSerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return (short) key1 - (short) key2 < 0;
-    }
-  }
-
-  class DoubleSerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return (double) key1 - (double) key2 < 0;
-    }
-  }
-
-  class LongSerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return (long) key1 - (long) key2 < 0;
-    }
-  }
-
-  class BigDecimalSerializableComparator implements SerializableComparator {
-    @Override public boolean compareTo(Object key1, Object key2) {
-      return ((BigDecimal) key1).compareTo((BigDecimal) key2) < 0;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java 
b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
index 44f62b1..8a37d01 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java
@@ -17,43 +17,7 @@
 package org.apache.carbondata.core.stats;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.metadata.schema.partition.AbstractPartition;
 
 public class PartitionStatistic implements Serializable {
 
-  /**
-   * total count of partitions
-   */
-  private int numberOfPartitions;
-
-  /**
-   * partition id, increase only
-   */
-  private int partitionIndex;
-
-  private Map<Integer, AbstractPartition> partitionMap;
-
-  public PartitionStatistic() {
-    this.partitionIndex = 0;
-    this.numberOfPartitions = 0;
-    this.partitionMap = new HashMap<>();
-  }
-
-  public void addNewPartition(int id, AbstractPartition partition) {
-    partitionMap.put(id, partition);
-    partitionIndex ++;
-    numberOfPartitions ++;
-  }
-
-  public void deletePartition(int id) {
-    partitionMap.remove(id);
-    numberOfPartitions --;
-  }
-
-  public int getNumberOfPartitions() {
-    return numberOfPartitions;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java 
b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
new file mode 100644
index 0000000..adce04f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.comparator;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public final class Comparator {
+
+  public static SerializableComparator getComparator(DataType dataType) {
+    switch (dataType) {
+      case INT:
+        return new IntSerializableComparator();
+      case SHORT:
+        return new ShortSerializableComparator();
+      case DOUBLE:
+        return new DoubleSerializableComparator();
+      case LONG:
+      case DATE:
+      case TIMESTAMP:
+        return new LongSerializableComparator();
+      case DECIMAL:
+        return new BigDecimalSerializableComparator();
+      default:
+        return new ByteArraySerializableComparator();
+    }
+  }
+}
+
+class ByteArraySerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    return ByteUtil.compare((byte[]) key1, (byte[]) key2);
+  }
+}
+
+class IntSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    if ((int) key1 < (int) key2) {
+      return -1;
+    } else if ((int) key1 > (int) key2) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+
+class ShortSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    if ((short) key1 < (short) key2) {
+      return -1;
+    } else if ((short) key1 > (short) key2) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+
+class DoubleSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    if ((double) key1 < (double) key2) {
+      return -1;
+    } else if ((double) key1 > (double) key2) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+
+class LongSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    if ((long) key1 < (long) key2) {
+      return -1;
+    } else if ((long) key1 > (long) key2) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+}
+
+class BigDecimalSerializableComparator implements SerializableComparator {
+  @Override public int compare(Object key1, Object key2) {
+    return ((BigDecimal) key1).compareTo((BigDecimal) key2);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java
 
b/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java
new file mode 100644
index 0000000..df0d3e2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.comparator;
+
+import java.io.Serializable;
+
+public interface SerializableComparator extends Serializable {
+  int compare(Object key1, Object key2);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift 
b/format/src/main/thrift/schema.thrift
index 3385245..2aabf36 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -132,9 +132,11 @@ struct SchemaEvolution{
 struct PartitionInfo{
     1: required list<ColumnSchema> partition_columns;
     2: required PartitionType partition_type;
-    3: optional i32 num_partitions;  // number of partitions defined in hash 
partition table
-    4: optional list<list<string>> list_info; // value list of list partition 
table
-    5: optional list<string> range_info;  // range value list of range 
partition table
+    3: optional list<list<string>> list_info; // value list of list partition 
table
+    4: optional list<string> range_info;  // range value list of range 
partition table
+    5: optional list<i32> partition_ids; // partition id list
+    6: optional i32 num_partitions;  // total partition count
+    7: optional i32 max_partition;  // max partition id for now
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 16b5d69..00e420c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -47,8 +47,6 @@ import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.partition.PartitionUtil;
-import org.apache.carbondata.core.scan.partition.Partitioner;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
@@ -272,20 +270,16 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       }
 
       CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
-
-      // prune partitions for filter query on partition table
       BitSet matchedPartitions = null;
-      if (null != filter) {
-        PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
-        if (null != partitionInfo) {
-          Partitioner partitioner = 
PartitionUtil.getPartitioner(partitionInfo);
-          matchedPartitions = new FilterExpressionProcessor()
-              .getFilteredPartitions(filter, partitionInfo, partitioner);
+      PartitionInfo partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName());
+      if (partitionInfo != null) {
+        // prune partitions for filter query on partition table
+        matchedPartitions = setMatchedPartitions(null, carbonTable, filter, 
partitionInfo);
+        if (matchedPartitions != null) {
           if (matchedPartitions.cardinality() == 0) {
             // no partition is required
             return new ArrayList<InputSplit>();
-          }
-          if (matchedPartitions.cardinality() == partitioner.numPartitions()) {
+          } else if (matchedPartitions.cardinality() == 
partitionInfo.getNumPartitions()) {
             // all partitions are required, no need to prune partitions
             matchedPartitions = null;
           }
@@ -295,7 +289,8 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       FilterResolverIntf filterInterface = 
CarbonInputFormatUtil.resolveFilter(filter, identifier);
 
       // do block filtering and get split
-      List<InputSplit> splits = getSplits(job, filterInterface, 
matchedPartitions, cacheClient);
+      List<InputSplit> splits = getSplits(job, filterInterface, 
matchedPartitions, cacheClient,
+          partitionInfo);
       // pass the invalid segment to task side in order to remove index entry 
in task side
       if (invalidSegments.size() > 0) {
         for (InputSplit split : splits) {
@@ -327,6 +322,24 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
     return carbonSplits;
   }
 
+  private BitSet setMatchedPartitions(String partitionIds, CarbonTable 
carbonTable,
+      Expression filter, PartitionInfo partitionInfo) {
+    BitSet matchedPartitions = null;
+    if (null != partitionIds) {
+      String[] partList = 
partitionIds.replace("[","").replace("]","").split(",");
+      matchedPartitions = new BitSet(Integer.parseInt(partList[0]));
+      for (String partitionId : partList) {
+        matchedPartitions.set(Integer.parseInt(partitionId));
+      }
+    } else {
+      if (null != filter) {
+        matchedPartitions = new FilterExpressionProcessor()
+            .getFilteredPartitions(filter, partitionInfo);
+      }
+    }
+    return matchedPartitions;
+  }
+
   /**
    * {@inheritDoc}
    * Configurations FileInputFormat.INPUT_DIR, 
CarbonInputFormat.INPUT_SEGMENT_NUMBERS
@@ -336,7 +349,8 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
    * @throws IOException
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf 
filterResolver,
-      BitSet matchedPartitions, CacheClient cacheClient) throws IOException {
+      BitSet matchedPartitions, CacheClient cacheClient, PartitionInfo 
partitionInfo)
+      throws IOException {
 
     List<InputSplit> result = new LinkedList<InputSplit>();
     FilterExpressionProcessor filterExpressionProcessor = new 
FilterExpressionProcessor();
@@ -352,10 +366,9 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
 
     //for each segment fetch blocks matching filter in Driver BTree
     for (String segmentNo : getSegmentsToAccess(job)) {
-      List<DataRefNode> dataRefNodes =
-          getDataBlocksOfSegment(job, filterExpressionProcessor, 
absoluteTableIdentifier,
-              filterResolver, matchedPartitions, segmentNo, cacheClient, 
updateStatusManager);
-
+      List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(job, 
filterExpressionProcessor,
+          absoluteTableIdentifier, filterResolver, matchedPartitions, 
segmentNo,
+          cacheClient, updateStatusManager, partitionInfo);
       // Get the UpdateVO for those tables on which IUD operations being 
performed.
       if (isIUDTable) {
         invalidBlockVOForSegmentId =
@@ -408,7 +421,8 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
       FilterExpressionProcessor filterExpressionProcessor,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf 
resolver,
       BitSet matchedPartitions, String segmentId, CacheClient cacheClient,
-      SegmentUpdateStatusManager updateStatusManager) throws IOException {
+      SegmentUpdateStatusManager updateStatusManager, PartitionInfo 
partitionInfo)
+      throws IOException {
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap 
= null;
     try {
       QueryStatisticsRecorder recorder = 
CarbonTimeStatisticsFactory.createDriverRecorder();
@@ -417,18 +431,26 @@ public class CarbonInputFormat<T> extends 
FileInputFormat<Void, T> {
           getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, 
cacheClient,
               updateStatusManager);
       List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+      int partitionIndex = -1;
+      List<Integer> partitionIdList = new ArrayList<>();
+      if (partitionInfo != null) {
+        partitionIdList = partitionInfo.getPartitionIds();
+      }
       if (null != segmentIndexMap) {
         for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> 
entry :
             segmentIndexMap.entrySet()) {
           SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey();
           int taskId = 
CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo);
-
+          if (partitionInfo != null) {
+            partitionIndex = partitionIdList.indexOf(taskId);
+          }
           // matchedPartitions variable will be null in two cases as follows
           // 1. the table is not a partition table
           // 2. the table is a partition table, and all partitions are matched 
by query
-          // for partition table, the task id of carbaondata file name is the 
partition id.
+          // for partition table, the task id could map to partition id.
           // if this partition is not required, here will skip it.
-          if (matchedPartitions == null || matchedPartitions.get(taskId)) {
+
+          if (matchedPartitions == null || 
matchedPartitions.get(partitionIndex)) {
             AbstractIndex abstractIndex = entry.getValue();
             List<DataRefNode> filterredBlocks;
             // if no filter is given get all blocks from Btree Index

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index a84eceb..4bfc4ef 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -113,7 +113,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
rangeTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_rangeTable", "0", Seq(0, 1, 3, 4))
+    validateDataFiles("default_rangeTable", "0", Seq(0, 1, 2, 4))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTable order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
@@ -134,7 +134,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
listTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_listTable", "0", Seq(1, 2))
+    validateDataFiles("default_listTable", "0", Seq(2, 3))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from listTable 
order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
@@ -174,7 +174,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
rangeTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'SINGLE_PASS'='TRUE')""")
 
-    validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 3, 4))
+    validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 2, 4))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTableSinglePass order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
@@ -195,7 +195,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
listTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 
'SINGLE_PASS'='TRUE')""")
 
-    validateDataFiles("default_listTableSinglePass", "0", Seq(1, 2))
+    validateDataFiles("default_listTableSinglePass", "0", Seq(2, 3))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
listTableSinglePass order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
@@ -235,7 +235,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql("insert into rangeTableForInsert select empno, empname, designation, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary, doj from 
originTable")
 
-    validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 3, 4))
+    validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 2, 4))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
rangeTableForInsert order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))
@@ -256,7 +256,7 @@ class TestDataLoadingForPartitionTable extends QueryTest 
with BeforeAndAfterAll
       """.stripMargin)
     sql("insert into listTableForInsert select empno, empname, designation, 
doj, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary, workgroupcategory from 
originTable")
 
-    validateDataFiles("default_listTableForInsert", "0", Seq(1, 2))
+    validateDataFiles("default_listTableForInsert", "0", Seq(2, 3))
 
     checkAnswer(sql("select empno, empname, designation, doj, 
workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, 
projectjoindate, projectenddate, attendance, utilization, salary from 
listTableForInsert order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, 
workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, 
projectenddate, attendance, utilization, salary from originTable order by 
empno"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 129c642..d325f71 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -594,6 +594,8 @@ class PartitionTableDataLoaderRDD[K, V](
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
+      val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable
+      val partitionInfo = 
carbonTable.getPartitionInfo(carbonTable.getFactTableName)
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
@@ -602,7 +604,7 @@ class PartitionTableDataLoaderRDD[K, V](
         
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
-        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        
carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
 
         val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index ac2e311..bb8c5a6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -27,22 +27,21 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, 
PartitionerField}
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.RowFactory
-import org.apache.spark.sql.types.MetadataBuilder
-import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.types.{MetadataBuilder, StringType}
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
@@ -295,6 +294,35 @@ object CommonUtil {
     }
     result
   }
+  /**
+   * To verify the range info is in correct order
+   * @param rangeInfo
+   * @param columnDataType
+   * @param timestampFormatter
+   * @param dateFormatter
+   */
+  def validateRangeInfo(rangeInfo: List[String], columnDataType: DataType,
+      timestampFormatter: SimpleDateFormat, dateFormatter: SimpleDateFormat): 
Unit = {
+    val comparator = Comparator.getComparator(columnDataType)
+    var head = columnDataType match {
+      case DataType.STRING => ByteUtil.toBytes(rangeInfo.head)
+      case _ => PartitionUtil.getDataBasedOnDataType(rangeInfo.head, 
columnDataType,
+        timestampFormatter, dateFormatter)
+    }
+    val iterator = rangeInfo.tail.toIterator
+    while(iterator.hasNext) {
+      val next = columnDataType match {
+        case DataType.STRING => ByteUtil.toBytes(iterator.next())
+        case _ => PartitionUtil.getDataBasedOnDataType(iterator.next(), 
columnDataType,
+          timestampFormatter, dateFormatter)
+      }
+      if (comparator.compare(head, next) < 0) {
+        head = next
+      } else {
+        sys.error("Range info must be in ascending order, please check again!")
+      }
+    }
+  }
 
   def validateFields(key: String, fields: Seq[Field]): Boolean = {
     var isValid: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4dbdc8d..474af08 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.catalyst
 
+import java.text.SimpleDateFormat
 import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map}
 import scala.language.implicitConversions
 import scala.util.matching.Regex
 
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,7 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
DataTypeUtil}
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
@@ -103,12 +105,14 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val LOCAL = carbonKeyWord("LOCAL")
   protected val MAPPED = carbonKeyWord("MAPPED")
   protected val MEASURES = carbonKeyWord("MEASURES")
+  protected val MERGE = carbonKeyWord("MERGE")
   protected val MULTILINE = carbonKeyWord("MULTILINE")
   protected val COMPLEX_DELIMITER_LEVEL_1 = 
carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
   protected val COMPLEX_DELIMITER_LEVEL_2 = 
carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
   protected val OPTIONS = carbonKeyWord("OPTIONS")
   protected val OUTPATH = carbonKeyWord("OUTPATH")
   protected val OVERWRITE = carbonKeyWord("OVERWRITE")
+  protected val PARTITION = carbonKeyWord("PARTITION")
   protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT")
   protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA")
   protected val PARTITIONER = carbonKeyWord("PARTITIONER")
@@ -119,6 +123,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
   protected val SCHEMAS = carbonKeyWord("SCHEMAS")
   protected val SET = Keyword("SET")
   protected val SHOW = carbonKeyWord("SHOW")
+  protected val SPLIT = carbonKeyWord("SPLIT")
   protected val TABLES = carbonKeyWord("TABLES")
   protected val TABLE = carbonKeyWord("TABLE")
   protected val TERMINATED = carbonKeyWord("TERMINATED")
@@ -365,14 +370,22 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
    */
   protected def getPartitionInfo(partitionCols: Seq[PartitionerField],
       tableProperties: Map[String, String]): Option[PartitionInfo] = {
+    val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
     if (partitionCols.isEmpty) {
       None
     } else {
       var partitionType: String = ""
       var numPartitions = 0
       var rangeInfo = List[String]()
-      var listInfo = ListBuffer[List[String]]()
-      var templist = ListBuffer[String]()
+      var listInfo = List[List[String]]()
+
+      val columnDataType = DataTypeConverterUtil.
+        convertToCarbonType(partitionCols.head.dataType.get)
       if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) 
{
         partitionType = 
tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get
       }
@@ -383,25 +396,11 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) {
         rangeInfo = 
tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.split(",")
           .map(_.trim()).toList
+        CommonUtil.validateRangeInfo(rangeInfo, columnDataType, 
timestampFormatter, dateFormatter)
       }
       if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) {
-        val arr = 
tableProperties.get(CarbonCommonConstants.LIST_INFO).get.split(",")
-          .map(_.trim())
-        val iter = arr.iterator
-        while (iter.hasNext) {
-          val value = iter.next()
-          if (value.startsWith("(")) {
-            templist += value.replace("(", "").trim()
-          } else if (value.endsWith(")")) {
-            templist += value.replace(")", "").trim()
-            listInfo += templist.toList
-            templist.clear()
-          } else {
-            templist += value
-            listInfo += templist.toList
-            templist.clear()
-          }
-        }
+        val originListInfo = 
tableProperties.get(CarbonCommonConstants.LIST_INFO).get
+        listInfo = PartitionUtils.getListInfo(originListInfo)
       }
       val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
       partitionCols.foreach(partition_col => {
@@ -415,11 +414,13 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       var partitionInfo : PartitionInfo = null
       partitionType.toUpperCase() match {
         case "HASH" => partitionInfo = new PartitionInfo(cols.asJava, 
PartitionType.HASH)
-          partitionInfo.setNumPartitions(numPartitions)
+          partitionInfo.initialize(numPartitions)
         case "RANGE" => partitionInfo = new PartitionInfo(cols.asJava, 
PartitionType.RANGE)
           partitionInfo.setRangeInfo(rangeInfo.asJava)
+          partitionInfo.initialize(rangeInfo.size + 1)
         case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, 
PartitionType.LIST)
-          partitionInfo.setListInfo(listInfo.map(_.asJava).toList.asJava)
+          partitionInfo.setListInfo(listInfo.map(_.asJava).asJava)
+          partitionInfo.initialize(listInfo.size + 1)
       }
       Some(partitionInfo)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
new file mode 100644
index 0000000..3949404
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import scala.collection.mutable.ListBuffer
+
+object PartitionUtils {
+
+  def getListInfo(originListInfo: String): List[List[String]] = {
+    var listInfo = ListBuffer[List[String]]()
+    var templist = ListBuffer[String]()
+    val arr = originListInfo.split(",")
+      .map(_.trim())
+    var groupEnd = true
+    val iter = arr.iterator
+    while (iter.hasNext) {
+      val value = iter.next()
+      if (value.startsWith("(")) {
+        templist += value.replace("(", "").trim()
+        groupEnd = false
+      } else if (value.endsWith(")")) {
+        templist += value.replace(")", "").trim()
+        listInfo += templist.toList
+        templist.clear()
+        groupEnd = true
+      } else {
+        if (groupEnd) {
+          templist += value
+          listInfo += templist.toList
+          templist.clear()
+        } else {
+          templist += value
+        }
+      }
+    }
+    listInfo.toList
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index bfc1be9..21ef6c4 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -327,7 +327,7 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * get copy with parition
+   * get copy with partition
    *
    * @param uniqueId
    * @return

Reply via email to