Repository: carbondata Updated Branches: refs/heads/master b944bd8a1 -> 6488bc018
[CARBONDATA-1250] Change default partition id & Add PartitionIdList in partitionInfo This closes #1125 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6488bc01 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6488bc01 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6488bc01 Branch: refs/heads/master Commit: 6488bc018a2ec715b31407d12290680d388a43b3 Parents: b944bd8 Author: lionelcao <whuca...@gmail.com> Authored: Fri Jun 30 00:00:32 2017 +0800 Committer: chenliang613 <chenliang...@apache.org> Committed: Fri Jul 14 10:11:10 2017 +0800 ---------------------------------------------------------------------- .../ThriftWrapperSchemaConverterImpl.java | 6 + .../core/metadata/schema/PartitionInfo.java | 59 ++++++++-- .../schema/partition/AbstractPartition.java | 42 ------- .../schema/partition/HashPartition.java | 34 ------ .../schema/partition/ListPartition.java | 36 ------ .../schema/partition/RangePartition.java | 40 ------- .../core/metadata/schema/table/CarbonTable.java | 9 -- .../scan/filter/FilterExpressionProcessor.java | 5 +- .../core/scan/filter/FilterProcessor.java | 4 +- .../filter/partition/PartitionFilterUtil.java | 112 ++++--------------- .../core/scan/partition/ListPartitioner.java | 8 +- .../core/scan/partition/RangePartitioner.java | 76 ++----------- .../core/stats/PartitionStatistic.java | 36 ------ .../core/util/comparator/Comparator.java | 105 +++++++++++++++++ .../util/comparator/SerializableComparator.java | 24 ++++ format/src/main/thrift/schema.thrift | 8 +- .../carbondata/hadoop/CarbonInputFormat.java | 66 +++++++---- .../TestDataLoadingForPartitionTable.scala | 12 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 4 +- .../carbondata/spark/util/CommonUtil.scala | 44 ++++++-- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 47 ++++---- .../org/apache/spark/util/PartitionUtils.scala | 52 +++++++++ .../processing/model/CarbonLoadModel.java | 2 +- 23 files changed, 390 insertions(+), 441 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 2d5f395..235a7ba 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -219,6 +219,9 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { externalPartitionInfo.setList_info(wrapperPartitionInfo.getListInfo()); externalPartitionInfo.setRange_info(wrapperPartitionInfo.getRangeInfo()); externalPartitionInfo.setNum_partitions(wrapperPartitionInfo.getNumPartitions()); + externalPartitionInfo.setMax_partition(wrapperPartitionInfo.getMAX_PARTITION()); + externalPartitionInfo.setPartition_ids(wrapperPartitionInfo + .getPartitionIds()); return externalPartitionInfo; } @@ -453,6 +456,9 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { wrapperPartitionInfo.setListInfo(externalPartitionInfo.getList_info()); wrapperPartitionInfo.setRangeInfo(externalPartitionInfo.getRange_info()); wrapperPartitionInfo.setNumPartitions(externalPartitionInfo.getNum_partitions()); + wrapperPartitionInfo.setPartitionIds(externalPartitionInfo + .getPartition_ids()); + wrapperPartitionInfo.setMAX_PARTITION(externalPartitionInfo.getMax_partition()); return wrapperPartitionInfo; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java index 2b08536..0d03998 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.metadata.schema; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.metadata.schema.partition.PartitionType; @@ -43,13 +44,25 @@ public class PartitionInfo implements Serializable { private List<List<String>> listInfo; /** - * number of partitions + * total count of partitions */ private int numPartitions; + /** + * current max partition id, increase only, will be used in alter table partition operation + */ + private int MAX_PARTITION; + + /** + * record the partitionId in the logical ascending order + * initiate when table created and changed when alter table + */ + private List<Integer> partitionIds; + public PartitionInfo(List<ColumnSchema> columnSchemaList, PartitionType partitionType) { this.columnSchemaList = columnSchemaList; this.partitionType = partitionType; + this.partitionIds = new ArrayList<>(); } public List<ColumnSchema> getColumnSchemaList() { @@ -64,14 +77,6 @@ public class PartitionInfo implements Serializable { return partitionType; } - public void setNumPartitions(int numPartitions) { - this.numPartitions = numPartitions; - } - - public int getNumPartitions() { - return numPartitions; - } - public void setRangeInfo(List<String> rangeInfo) { this.rangeInfo = rangeInfo; } @@ -88,4 +93,40 @@ public class PartitionInfo implements Serializable { return listInfo; } + public void initialize(int partitionNum) { + for (int i = 0; i < partitionNum; i++) { + partitionIds.add(i); + } + MAX_PARTITION = partitionNum - 1; + numPartitions = partitionNum; + } + + public void setNumPartitions(int numPartitions) { + this.numPartitions = numPartitions; + } + + public int getNumPartitions() { + return numPartitions; + } + + public int getMAX_PARTITION() { + return MAX_PARTITION; + } + + public void setMAX_PARTITION(int max_partition) { + this.MAX_PARTITION = max_partition; + } + + public List<Integer> getPartitionIds() { + return partitionIds; + } + + public void setPartitionIds(List<Integer> partitionIdList) { + this.partitionIds = partitionIdList; + } + + public int getPartitionId(int index) { + return partitionIds.get(index); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java deleted file mode 100644 index 1d6337c..0000000 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/AbstractPartition.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.metadata.schema.partition; - -public abstract class AbstractPartition { - - /** - * Partition unique identifier - */ - protected int partitionId; - - /** - * Total row count of this partition - */ - protected int rowCount; - - public int getPartitionId() { - return partitionId; - } - - public void setRowCount(int count) { - this.rowCount = count; - } - - public int getRowCount() { - return rowCount; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java deleted file mode 100644 index 6b43525..0000000 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/HashPartition.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.metadata.schema.partition; - -public class HashPartition extends AbstractPartition { - - /** - * hash value for hash partition table - */ - private int hashValue; - - public HashPartition(int id, int hashValue) { - this.partitionId = id; - this.hashValue = hashValue; - } - - public int getHashValue() { - return hashValue; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java deleted file mode 100644 index 11a396f..0000000 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/ListPartition.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.metadata.schema.partition; - -import java.util.List; - -public class ListPartition extends AbstractPartition { - - /** - * value list for list partition table - */ - private List<String> listInfo; - - public ListPartition(int id, List<String> listInfo) { - this.partitionId = id; - this.listInfo = listInfo; - } - - public List<String> getListInfo() { - return listInfo; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java deleted file mode 100644 index af2b05d..0000000 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/partition/RangePartition.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.metadata.schema.partition; - -public class RangePartition extends AbstractPartition { - /** - * boundary value for range partition table - */ - private String lowerBoundary; - - private String upperBoundary; - - public RangePartition(int id, String lowerBoundary, String upperBoundary) { - this.partitionId = id; - this.lowerBoundary = lowerBoundary; - this.upperBoundary = upperBoundary; - } - - public String getLowerBoundary() { - return lowerBoundary; - } - - public String getUpperBoundary() { - return upperBoundary; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 16ded57..cec8007 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -33,7 +33,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.stats.PartitionStatistic; /** * Mapping class for Carbon actual table @@ -99,10 +98,6 @@ public class CarbonTable implements Serializable { private Map<String, PartitionInfo> tablePartitionMap; /** - * statistic information of partition table - */ - private PartitionStatistic partitionStatistic; - /** * tableUniqueName */ private String tableUniqueName; @@ -597,10 +592,6 @@ public class CarbonTable implements Serializable { return null != tablePartitionMap.get(getFactTableName()); } - public PartitionStatistic getPartitionStatistic() { - return partitionStatistic; - } - /** * @return absolute table identifier */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index ce31283..21c7bf6 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -63,6 +63,7 @@ import org.apache.carbondata.core.scan.filter.resolver.LogicalFilterResolverImpl import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl; import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl; +import org.apache.carbondata.core.scan.partition.PartitionUtil; import org.apache.carbondata.core.scan.partition.Partitioner; public class FilterExpressionProcessor implements FilterProcessor { @@ -163,11 +164,11 @@ public class FilterExpressionProcessor implements FilterProcessor { * The value of "1" in BitSet represent the required partition * @param expressionTree * @param partitionInfo - * @param partitioner * @return */ @Override public BitSet getFilteredPartitions(Expression expressionTree, - PartitionInfo partitionInfo, Partitioner partitioner) { + PartitionInfo partitionInfo) { + Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo); return createPartitionFilterTree(expressionTree, partitionInfo).applyFilter(partitioner); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java index 9f5f7f1..246166d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterProcessor.java @@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.scan.partition.Partitioner; public interface FilterProcessor { @@ -59,6 +58,5 @@ public interface FilterProcessor { * This API will get the map of required partitions. * @return BitSet the value "1" represent the required partition. */ - BitSet getFilteredPartitions(Expression expressionTree, PartitionInfo partitionInfo, - Partitioner partitioner); + BitSet getFilteredPartitions(Expression expressionTree, PartitionInfo partitionInfo); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java index 1ab2ae6..efb8bdb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java @@ -17,10 +17,8 @@ package org.apache.carbondata.core.scan.filter.partition; -import java.math.BigDecimal; import java.text.DateFormat; import java.util.BitSet; -import java.util.Comparator; import java.util.List; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -28,86 +26,12 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.scan.partition.ListPartitioner; import org.apache.carbondata.core.scan.partition.PartitionUtil; import org.apache.carbondata.core.scan.partition.RangePartitioner; -import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.comparator.Comparator; +import org.apache.carbondata.core.util.comparator.SerializableComparator; public class PartitionFilterUtil { /** - * create Comparator for range filter - * @param dataType - * @return - */ - public static Comparator getComparatorByDataType(DataType dataType) { - switch (dataType) { - case INT: - return new IntComparator(); - case SHORT: - return new ShortComparator(); - case DOUBLE: - return new DoubleComparator(); - case LONG: - case DATE: - case TIMESTAMP: - return new LongComparator(); - case DECIMAL: - return new BigDecimalComparator(); - default: - return new ByteArrayComparator(); - } - } - - static class ByteArrayComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - return ByteUtil.compare((byte[]) key1, (byte[]) key2); - } - } - - static class IntComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - return (int) key1 - (int) key2; - } - } - - static class ShortComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - return (short) key1 - (short) key2; - } - } - - static class DoubleComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - double result = (double) key1 - (double) key2; - if (result < 0) { - return -1; - } else if (result > 0) { - return 1; - } else { - return 0; - } - - } - } - - static class LongComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - long result = (long) key1 - (long) key2; - if (result < 0) { - return -1; - } else if (result > 0) { - return 1; - } else { - return 0; - } - } - } - - static class BigDecimalComparator implements Comparator<Object> { - @Override public int compare(Object key1, Object key2) { - return ((BigDecimal) key1).compareTo((BigDecimal) key2); - } - } - - /** * get partition map of range filter on list partition table * @param partitionInfo * @param partitioner @@ -123,12 +47,12 @@ public class PartitionFilterUtil { List<List<String>> values = partitionInfo.getListInfo(); DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType(); - Comparator comparator = - PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType); + SerializableComparator comparator = + Comparator.getComparator(partitionColumnDataType); BitSet partitionMap = PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false); // add default partition - partitionMap.set(partitioner.numPartitions() - 1); + partitionMap.set(0); int partitions = values.size(); if (isGreaterThan) { @@ -140,7 +64,7 @@ public class PartitionFilterUtil { Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType, timestampFormatter, dateFormatter); if (comparator.compare(listValue, filterValue) >= 0) { - partitionMap.set(i); + partitionMap.set(i + 1); continue outer1; } } @@ -153,7 +77,7 @@ public class PartitionFilterUtil { Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType, timestampFormatter, dateFormatter); if (comparator.compare(listValue, filterValue) > 0) { - partitionMap.set(i); + partitionMap.set(i + 1); continue outer2; } } @@ -168,7 +92,7 @@ public class PartitionFilterUtil { Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType, timestampFormatter, dateFormatter); if (comparator.compare(listValue, filterValue) <= 0) { - partitionMap.set(i); + partitionMap.set(i + 1); continue outer3; } } @@ -181,7 +105,7 @@ public class PartitionFilterUtil { Object listValue = PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType, timestampFormatter, dateFormatter); if (comparator.compare(listValue, filterValue) < 0) { - partitionMap.set(i); + partitionMap.set(i + 1); continue outer4; } } @@ -208,8 +132,8 @@ public class PartitionFilterUtil { List<String> values = partitionInfo.getRangeInfo(); DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType(); - Comparator comparator = - PartitionFilterUtil.getComparatorByDataType(partitionColumnDataType); + SerializableComparator comparator = + Comparator.getComparator(partitionColumnDataType); BitSet partitionMap = PartitionUtil.generateBitSetBySize(partitioner.numPartitions(), false); @@ -229,7 +153,7 @@ public class PartitionFilterUtil { // filter value is in default partition if (isGreaterThan) { // GreaterThan(>), GreaterThanEqualTo(>=) - partitionMap.set(numPartitions); + partitionMap.set(0); } else { // LessThan(<), LessThanEqualTo(<=) partitionMap.set(0, partitioner.numPartitions()); @@ -240,24 +164,26 @@ public class PartitionFilterUtil { // if result is 0, the filter value is a bound value of range partition. if (isGreaterThan) { // GreaterThan(>), GreaterThanEqualTo(>=) - partitionMap.set(partitionIndex + 1, partitioner.numPartitions()); + partitionMap.set(partitionIndex + 2, partitioner.numPartitions()); + partitionMap.set(0); } else { if (isEqualTo) { // LessThanEqualTo(<=) - partitionMap.set(0, partitionIndex + 2); + partitionMap.set(1, partitionIndex + 3); } else { // LessThan(<) - partitionMap.set(0, partitionIndex + 1); + partitionMap.set(1, partitionIndex + 2); } } } else { // the filter value is not a bound value of range partition if (isGreaterThan) { // GreaterThan(>), GreaterThanEqualTo(>=) - partitionMap.set(partitionIndex, partitioner.numPartitions()); + partitionMap.set(partitionIndex + 1, partitioner.numPartitions()); + partitionMap.set(0); } else { // LessThan(<), LessThanEqualTo(<=) - partitionMap.set(0, partitionIndex + 1); + partitionMap.set(1, partitionIndex + 2); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java index a2004bf..f6cc6b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java @@ -40,7 +40,7 @@ public class ListPartitioner implements Partitioner { CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); /** - * map the value of ListPartition to partition id. + * Map the value of ListPartition to partition id. */ private Map<Object, Integer> map = new java.util.HashMap<Object, Integer>(); @@ -53,13 +53,13 @@ public class ListPartitioner implements Partitioner { for (int i = 0; i < numPartitions; i++) { for (String value : values.get(i)) { map.put(PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType, - timestampFormatter, dateFormatter), i); + timestampFormatter, dateFormatter), i + 1); } } } /** - * number of partitions + * Number of partitions * add extra default partition * @return */ @@ -70,7 +70,7 @@ public class ListPartitioner implements Partitioner { @Override public int getPartition(Object key) { Integer partition = map.get(key); if (partition == null) { - return numPartitions; + return 0; } return partition; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java index 3c02736..50f609a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java @@ -17,8 +17,6 @@ package org.apache.carbondata.core.scan.partition; -import java.io.Serializable; -import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.List; @@ -27,6 +25,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.comparator.Comparator; +import org.apache.carbondata.core.util.comparator.SerializableComparator; /** * Range Partitioner @@ -60,28 +60,7 @@ public class RangePartitioner implements Partitioner { timestampFormatter, dateFormatter); } } - - switch (partitionColumnDataType) { - case INT: - comparator = new IntSerializableComparator(); - break; - case SHORT: - comparator = new ShortSerializableComparator(); - break; - case DOUBLE: - comparator = new DoubleSerializableComparator(); - break; - case LONG: - case DATE: - case TIMESTAMP: - comparator = new LongSerializableComparator(); - break; - case DECIMAL: - comparator = new BigDecimalSerializableComparator(); - break; - default: - comparator = new ByteArraySerializableComparator(); - } + comparator = Comparator.getComparator(partitionColumnDataType); } /** @@ -96,55 +75,14 @@ public class RangePartitioner implements Partitioner { @Override public int getPartition(Object key) { if (key == null) { - return numPartitions; + return 0; } else { for (int i = 0; i < numPartitions; i++) { - if (comparator.compareTo(key, bounds[i])) { - return i; + if (comparator.compare(key, bounds[i]) < 0) { + return i + 1; } } - return numPartitions; - } - } - - interface SerializableComparator extends Serializable { - boolean compareTo(Object key1, Object key2); - } - - class ByteArraySerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return ByteUtil.compare((byte[]) key1, (byte[]) key2) < 0; - } - } - - class IntSerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return (int) key1 - (int) key2 < 0; + return 0; } } - - class ShortSerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return (short) key1 - (short) key2 < 0; - } - } - - class DoubleSerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return (double) key1 - (double) key2 < 0; - } - } - - class LongSerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return (long) key1 - (long) key2 < 0; - } - } - - class BigDecimalSerializableComparator implements SerializableComparator { - @Override public boolean compareTo(Object key1, Object key2) { - return ((BigDecimal) key1).compareTo((BigDecimal) key2) < 0; - } - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java index 44f62b1..8a37d01 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java @@ -17,43 +17,7 @@ package org.apache.carbondata.core.stats; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.carbondata.core.metadata.schema.partition.AbstractPartition; public class PartitionStatistic implements Serializable { - /** - * total count of partitions - */ - private int numberOfPartitions; - - /** - * partition id, increase only - */ - private int partitionIndex; - - private Map<Integer, AbstractPartition> partitionMap; - - public PartitionStatistic() { - this.partitionIndex = 0; - this.numberOfPartitions = 0; - this.partitionMap = new HashMap<>(); - } - - public void addNewPartition(int id, AbstractPartition partition) { - partitionMap.put(id, partition); - partitionIndex ++; - numberOfPartitions ++; - } - - public void deletePartition(int id) { - partitionMap.remove(id); - numberOfPartitions --; - } - - public int getNumberOfPartitions() { - return numberOfPartitions; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java new file mode 100644 index 0000000..adce04f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/comparator/Comparator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.comparator; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.util.ByteUtil; + +public final class Comparator { + + public static SerializableComparator getComparator(DataType dataType) { + switch (dataType) { + case INT: + return new IntSerializableComparator(); + case SHORT: + return new ShortSerializableComparator(); + case DOUBLE: + return new DoubleSerializableComparator(); + case LONG: + case DATE: + case TIMESTAMP: + return new LongSerializableComparator(); + case DECIMAL: + return new BigDecimalSerializableComparator(); + default: + return new ByteArraySerializableComparator(); + } + } +} + +class ByteArraySerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + return ByteUtil.compare((byte[]) key1, (byte[]) key2); + } +} + +class IntSerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + if ((int) key1 < (int) key2) { + return -1; + } else if ((int) key1 > (int) key2) { + return 1; + } else { + return 0; + } + } +} + +class ShortSerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + if ((short) key1 < (short) key2) { + return -1; + } else if ((short) key1 > (short) key2) { + return 1; + } else { + return 0; + } + } +} + +class DoubleSerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + if ((double) key1 < (double) key2) { + return -1; + } else if ((double) key1 > (double) key2) { + return 1; + } else { + return 0; + } + } +} + +class LongSerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + if ((long) key1 < (long) key2) { + return -1; + } else if ((long) key1 > (long) key2) { + return 1; + } else { + return 0; + } + } +} + +class BigDecimalSerializableComparator implements SerializableComparator { + @Override public int compare(Object key1, Object key2) { + return ((BigDecimal) key1).compareTo((BigDecimal) key2); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java b/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java new file mode 100644 index 0000000..df0d3e2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/comparator/SerializableComparator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.comparator; + +import java.io.Serializable; + +public interface SerializableComparator extends Serializable { + int compare(Object key1, Object key2); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/format/src/main/thrift/schema.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift index 3385245..2aabf36 100644 --- a/format/src/main/thrift/schema.thrift +++ b/format/src/main/thrift/schema.thrift @@ -132,9 +132,11 @@ struct SchemaEvolution{ struct PartitionInfo{ 1: required list<ColumnSchema> partition_columns; 2: required PartitionType partition_type; - 3: optional i32 num_partitions; // number of partitions defined in hash partition table - 4: optional list<list<string>> list_info; // value list of list partition table - 5: optional list<string> range_info; // range value list of range partition table + 3: optional list<list<string>> list_info; // value list of list partition table + 4: optional list<string> range_info; // range value list of range partition table + 5: optional list<i32> partition_ids; // partition id list + 6: optional i32 num_partitions; // total partition count + 7: optional i32 max_partition; // max partition id for now } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 16b5d69..00e420c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -47,8 +47,6 @@ import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.partition.PartitionUtil; -import org.apache.carbondata.core.scan.partition.Partitioner; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; @@ -272,20 +270,16 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { } CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); - - // prune partitions for filter query on partition table BitSet matchedPartitions = null; - if (null != filter) { - PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); - if (null != partitionInfo) { - Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo); - matchedPartitions = new FilterExpressionProcessor() - .getFilteredPartitions(filter, partitionInfo, partitioner); + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); + if (partitionInfo != null) { + // prune partitions for filter query on partition table + matchedPartitions = setMatchedPartitions(null, carbonTable, filter, partitionInfo); + if (matchedPartitions != null) { if (matchedPartitions.cardinality() == 0) { // no partition is required return new ArrayList<InputSplit>(); - } - if (matchedPartitions.cardinality() == partitioner.numPartitions()) { + } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { // all partitions are required, no need to prune partitions matchedPartitions = null; } @@ -295,7 +289,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); // do block filtering and get split - List<InputSplit> splits = getSplits(job, filterInterface, matchedPartitions, cacheClient); + List<InputSplit> splits = getSplits(job, filterInterface, matchedPartitions, cacheClient, + partitionInfo); // pass the invalid segment to task side in order to remove index entry in task side if (invalidSegments.size() > 0) { for (InputSplit split : splits) { @@ -327,6 +322,24 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return carbonSplits; } + private BitSet setMatchedPartitions(String partitionIds, CarbonTable carbonTable, + Expression filter, PartitionInfo partitionInfo) { + BitSet matchedPartitions = null; + if (null != partitionIds) { + String[] partList = partitionIds.replace("[","").replace("]","").split(","); + matchedPartitions = new BitSet(Integer.parseInt(partList[0])); + for (String partitionId : partList) { + matchedPartitions.set(Integer.parseInt(partitionId)); + } + } else { + if (null != filter) { + matchedPartitions = new FilterExpressionProcessor() + .getFilteredPartitions(filter, partitionInfo); + } + } + return matchedPartitions; + } + /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS @@ -336,7 +349,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * @throws IOException */ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, - BitSet matchedPartitions, CacheClient cacheClient) throws IOException { + BitSet matchedPartitions, CacheClient cacheClient, PartitionInfo partitionInfo) + throws IOException { List<InputSplit> result = new LinkedList<InputSplit>(); FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); @@ -352,10 +366,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { //for each segment fetch blocks matching filter in Driver BTree for (String segmentNo : getSegmentsToAccess(job)) { - List<DataRefNode> dataRefNodes = - getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, - filterResolver, matchedPartitions, segmentNo, cacheClient, updateStatusManager); - + List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, + absoluteTableIdentifier, filterResolver, matchedPartitions, segmentNo, + cacheClient, updateStatusManager, partitionInfo); // Get the UpdateVO for those tables on which IUD operations being performed. if (isIUDTable) { invalidBlockVOForSegmentId = @@ -408,7 +421,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { FilterExpressionProcessor filterExpressionProcessor, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, BitSet matchedPartitions, String segmentId, CacheClient cacheClient, - SegmentUpdateStatusManager updateStatusManager) throws IOException { + SegmentUpdateStatusManager updateStatusManager, PartitionInfo partitionInfo) + throws IOException { Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; try { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); @@ -417,18 +431,26 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> { getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId, cacheClient, updateStatusManager); List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>(); + int partitionIndex = -1; + List<Integer> partitionIdList = new ArrayList<>(); + if (partitionInfo != null) { + partitionIdList = partitionInfo.getPartitionIds(); + } if (null != segmentIndexMap) { for (Map.Entry<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> entry : segmentIndexMap.entrySet()) { SegmentTaskIndexStore.TaskBucketHolder taskHolder = entry.getKey(); int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskHolder.taskNo); - + if (partitionInfo != null) { + partitionIndex = partitionIdList.indexOf(taskId); + } // matchedPartitions variable will be null in two cases as follows // 1. the table is not a partition table // 2. the table is a partition table, and all partitions are matched by query - // for partition table, the task id of carbaondata file name is the partition id. + // for partition table, the task id could map to partition id. // if this partition is not required, here will skip it. - if (matchedPartitions == null || matchedPartitions.get(taskId)) { + + if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { AbstractIndex abstractIndex = entry.getValue(); List<DataRefNode> filterredBlocks; // if no filter is given get all blocks from Btree Index http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index a84eceb..4bfc4ef 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -113,7 +113,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_rangeTable", "0", Seq(0, 1, 3, 4)) + validateDataFiles("default_rangeTable", "0", Seq(0, 1, 2, 4)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTable order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -134,7 +134,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") - validateDataFiles("default_listTable", "0", Seq(1, 2)) + validateDataFiles("default_listTable", "0", Seq(2, 3)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTable order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -174,7 +174,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""") - validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 3, 4)) + validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 2, 4)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableSinglePass order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -195,7 +195,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""") - validateDataFiles("default_listTableSinglePass", "0", Seq(1, 2)) + validateDataFiles("default_listTableSinglePass", "0", Seq(2, 3)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableSinglePass order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -235,7 +235,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql("insert into rangeTableForInsert select empno, empname, designation, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, doj from originTable") - validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 3, 4)) + validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 2, 4)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableForInsert order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) @@ -256,7 +256,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll """.stripMargin) sql("insert into listTableForInsert select empno, empname, designation, doj, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, workgroupcategory from originTable") - validateDataFiles("default_listTableForInsert", "0", Seq(1, 2)) + validateDataFiles("default_listTableForInsert", "0", Seq(2, 3)) checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableForInsert order by empno"), sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno")) http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 129c642..d325f71 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -594,6 +594,8 @@ class PartitionTableDataLoaderRDD[K, V]( val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel + val carbonTable = model.getCarbonDataLoadSchema.getCarbonTable + val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { @@ -602,7 +604,7 @@ class PartitionTableDataLoaderRDD[K, V]( loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setSegmentId(String.valueOf(loadCount)) - carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index))) carbonLoadModel.setPreFetch(false) val recordReaders = Array[CarbonIterator[Array[AnyRef]]] { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index ac2e311..bb8c5a6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -27,22 +27,21 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.{Row, RowFactory} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField} -import org.apache.spark.sql.Row -import org.apache.spark.sql.RowFactory -import org.apache.spark.sql.types.MetadataBuilder -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{MetadataBuilder, StringType} import org.apache.spark.util.FileUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataType -import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.PartitionInfo +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.comparator.Comparator import org.apache.carbondata.processing.csvload.CSVInputFormat import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException @@ -295,6 +294,35 @@ object CommonUtil { } result } + /** + * To verify the range info is in correct order + * @param rangeInfo + * @param columnDataType + * @param timestampFormatter + * @param dateFormatter + */ + def validateRangeInfo(rangeInfo: List[String], columnDataType: DataType, + timestampFormatter: SimpleDateFormat, dateFormatter: SimpleDateFormat): Unit = { + val comparator = Comparator.getComparator(columnDataType) + var head = columnDataType match { + case DataType.STRING => ByteUtil.toBytes(rangeInfo.head) + case _ => PartitionUtil.getDataBasedOnDataType(rangeInfo.head, columnDataType, + timestampFormatter, dateFormatter) + } + val iterator = rangeInfo.tail.toIterator + while(iterator.hasNext) { + val next = columnDataType match { + case DataType.STRING => ByteUtil.toBytes(iterator.next()) + case _ => PartitionUtil.getDataBasedOnDataType(iterator.next(), columnDataType, + timestampFormatter, dateFormatter) + } + if (comparator.compare(head, next) < 0) { + head = next + } else { + sys.error("Range info must be in ascending order, please check again!") + } + } + } def validateFields(key: String, fields: Seq[Field]): Boolean = { var isValid: Boolean = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 4dbdc8d..474af08 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.catalyst +import java.text.SimpleDateFormat import java.util.regex.{Matcher, Pattern} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, LinkedHashSet, Map} import scala.language.implicitConversions import scala.util.matching.Regex @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.execution.command._ +import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory @@ -37,7 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.processing.newflow.sort.SortScopeOptions import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil} @@ -103,12 +105,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val LOCAL = carbonKeyWord("LOCAL") protected val MAPPED = carbonKeyWord("MAPPED") protected val MEASURES = carbonKeyWord("MEASURES") + protected val MERGE = carbonKeyWord("MERGE") protected val MULTILINE = carbonKeyWord("MULTILINE") protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1") protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2") protected val OPTIONS = carbonKeyWord("OPTIONS") protected val OUTPATH = carbonKeyWord("OUTPATH") protected val OVERWRITE = carbonKeyWord("OVERWRITE") + protected val PARTITION = carbonKeyWord("PARTITION") protected val PARTITION_COUNT = carbonKeyWord("PARTITION_COUNT") protected val PARTITIONDATA = carbonKeyWord("PARTITIONDATA") protected val PARTITIONER = carbonKeyWord("PARTITIONER") @@ -119,6 +123,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val SCHEMAS = carbonKeyWord("SCHEMAS") protected val SET = Keyword("SET") protected val SHOW = carbonKeyWord("SHOW") + protected val SPLIT = carbonKeyWord("SPLIT") protected val TABLES = carbonKeyWord("TABLES") protected val TABLE = carbonKeyWord("TABLE") protected val TERMINATED = carbonKeyWord("TERMINATED") @@ -365,14 +370,22 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { */ protected def getPartitionInfo(partitionCols: Seq[PartitionerField], tableProperties: Map[String, String]): Option[PartitionInfo] = { + val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) if (partitionCols.isEmpty) { None } else { var partitionType: String = "" var numPartitions = 0 var rangeInfo = List[String]() - var listInfo = ListBuffer[List[String]]() - var templist = ListBuffer[String]() + var listInfo = List[List[String]]() + + val columnDataType = DataTypeConverterUtil. + convertToCarbonType(partitionCols.head.dataType.get) if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) { partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get } @@ -383,25 +396,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) { rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.split(",") .map(_.trim()).toList + CommonUtil.validateRangeInfo(rangeInfo, columnDataType, timestampFormatter, dateFormatter) } if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) { - val arr = tableProperties.get(CarbonCommonConstants.LIST_INFO).get.split(",") - .map(_.trim()) - val iter = arr.iterator - while (iter.hasNext) { - val value = iter.next() - if (value.startsWith("(")) { - templist += value.replace("(", "").trim() - } else if (value.endsWith(")")) { - templist += value.replace(")", "").trim() - listInfo += templist.toList - templist.clear() - } else { - templist += value - listInfo += templist.toList - templist.clear() - } - } + val originListInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO).get + listInfo = PartitionUtils.getListInfo(originListInfo) } val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]() partitionCols.foreach(partition_col => { @@ -415,11 +414,13 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { var partitionInfo : PartitionInfo = null partitionType.toUpperCase() match { case "HASH" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH) - partitionInfo.setNumPartitions(numPartitions) + partitionInfo.initialize(numPartitions) case "RANGE" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE) partitionInfo.setRangeInfo(rangeInfo.asJava) + partitionInfo.initialize(rangeInfo.size + 1) case "LIST" => partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST) - partitionInfo.setListInfo(listInfo.map(_.asJava).toList.asJava) + partitionInfo.setListInfo(listInfo.map(_.asJava).asJava) + partitionInfo.initialize(listInfo.size + 1) } Some(partitionInfo) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala new file mode 100644 index 0000000..3949404 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import scala.collection.mutable.ListBuffer + +object PartitionUtils { + + def getListInfo(originListInfo: String): List[List[String]] = { + var listInfo = ListBuffer[List[String]]() + var templist = ListBuffer[String]() + val arr = originListInfo.split(",") + .map(_.trim()) + var groupEnd = true + val iter = arr.iterator + while (iter.hasNext) { + val value = iter.next() + if (value.startsWith("(")) { + templist += value.replace("(", "").trim() + groupEnd = false + } else if (value.endsWith(")")) { + templist += value.replace(")", "").trim() + listInfo += templist.toList + templist.clear() + groupEnd = true + } else { + if (groupEnd) { + templist += value + listInfo += templist.toList + templist.clear() + } else { + templist += value + } + } + } + listInfo.toList + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6488bc01/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index bfc1be9..21ef6c4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -327,7 +327,7 @@ public class CarbonLoadModel implements Serializable { } /** - * get copy with parition + * get copy with partition * * @param uniqueId * @return