[CARBONDATA-940] alter table add/split partition for spark 2.1 add/split partition function
This closes #1192 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/874764f9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/874764f9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/874764f9 Branch: refs/heads/master Commit: 874764f95629c98d8f98a424ca742614e38f704f Parents: 3efd330 Author: lionelcao <whuca...@gmail.com> Authored: Wed Jul 19 14:36:18 2017 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Thu Aug 17 09:28:54 2017 +0800 ---------------------------------------------------------------------- conf/carbon.properties.template | 3 + .../core/constants/CarbonCommonConstants.java | 9 + .../apache/carbondata/core/locks/LockUsage.java | 1 + .../core/metadata/schema/PartitionInfo.java | 25 + .../filter/partition/PartitionFilterUtil.java | 2 +- .../PartitionSpliterRawResultIterator.java | 72 +++ .../core/scan/wrappers/ByteArrayWrapper.java | 4 +- .../core/util/path/CarbonTablePath.java | 18 + .../src/main/resources/partition_data.csv | 25 - .../examples/CarbonPartitionExample.scala | 75 ++- .../hadoop/api/CarbonTableInputFormat.java | 150 +++++- .../hadoop/util/CarbonInputFormatUtil.java | 8 + .../src/test/resources/partition_data.csv | 28 + .../TestDataLoadingForPartitionTable.scala | 4 +- .../carbondata/spark/load/CarbonLoaderUtil.java | 4 +- .../spark/partition/SplitPartitionCallable.java | 41 ++ .../org/apache/carbondata/spark/KeyVal.scala | 8 + .../load/DataLoadProcessorStepOnSpark.scala | 2 +- .../spark/rdd/AlterTableSplitPartitionRDD.scala | 146 ++++++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 8 +- .../spark/rdd/CarbonScanPartitionRDD.scala | 269 ++++++++++ .../spark/rdd/DataManagementFunc.scala | 47 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 6 +- .../spark/rdd/PartitionSplitter.scala | 104 ++++ .../carbondata/spark/util/CommonUtil.scala | 40 +- .../execution/command/carbonTableSchema.scala | 16 + .../org/apache/spark/util/PartitionUtils.scala | 135 +++++ .../spark/rdd/CarbonDataRDDFactory.scala | 99 +++- .../execution/command/carbonTableSchema.scala | 147 +++++- .../sql/parser/CarbonSpark2SqlParser.scala | 26 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 2 +- .../partition/TestAlterPartitionTable.scala | 523 +++++++++++++++++++ .../merger/CompactionResultSortProcessor.java | 3 +- .../newflow/DataLoadProcessBuilder.java | 2 +- .../sort/impl/ParallelReadMergeSorterImpl.java | 2 +- ...arallelReadMergeSorterWithBucketingImpl.java | 4 +- .../UnsafeBatchParallelReadMergeSorterImpl.java | 2 +- ...arallelReadMergeSorterWithBucketingImpl.java | 5 +- .../CarbonRowDataWriterProcessorStepImpl.java | 2 +- .../steps/DataWriterBatchProcessorStepImpl.java | 2 +- .../steps/DataWriterProcessorStepImpl.java | 2 +- .../sortandgroupby/sortdata/SortParameters.java | 4 +- .../spliter/AbstractCarbonQueryExecutor.java | 133 +++++ .../processing/spliter/CarbonSplitExecutor.java | 64 +++ .../spliter/RowResultSpliterProcessor.java | 105 ++++ .../exception/SliceSpliterException.java | 78 +++ .../store/CarbonFactDataHandlerModel.java | 2 + .../util/CarbonDataProcessorUtil.java | 12 +- 48 files changed, 2360 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/conf/carbon.properties.template ---------------------------------------------------------------------- diff --git a/conf/carbon.properties.template b/conf/carbon.properties.template index b5f5101..de8a4ee 100644 --- a/conf/carbon.properties.template +++ b/conf/carbon.properties.template @@ -42,6 +42,9 @@ carbon.enableXXHash=true #carbon.max.level.cache.size=-1 #enable prefetch of data during merge sort while reading data from sort temp files in data loading #carbon.merge.sort.prefetch=true +######## Alter Partition Configuration ######## +#Number of cores to be used while alter partition +carbon.number.of.cores.while.alterPartition=2 ######## Compaction Configuration ######## #Number of cores to be used while compacting carbon.number.of.cores.while.compacting=2 http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 14258f8..74a08ec 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -306,6 +306,10 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String NUM_CORES_COMPACTING = "carbon.number.of.cores.while.compacting"; /** + * Number of cores to be used while alter partition + */ + public static final String NUM_CORES_ALT_PARTITION = "carbon.number.of.cores.while.altPartition"; + /** * Number of cores to be used for block sort */ @CarbonProperty @@ -967,6 +971,11 @@ public final class CarbonCommonConstants { public static final String COMPACTION_KEY_WORD = "COMPACTION"; /** + * Indicates alter partition + */ + public static String ALTER_PARTITION_KEY_WORD = "ALTER_PARTITION"; + + /** * hdfs temporary directory key */ @CarbonProperty http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java index 046fff5..1738c64 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LockUsage.java @@ -26,6 +26,7 @@ public class LockUsage { public static final String METADATA_LOCK = "meta.lock"; public static final String COMPACTION_LOCK = "compaction.lock"; public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock"; + public static final String ALTER_PARTITION_LOCK = "alter_partition.lock"; public static final String TABLE_STATUS_LOCK = "tablestatus.lock"; public static final String TABLE_UPDATE_STATUS_LOCK = "tableupdatestatus.lock"; public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock"; http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java index 39d7f3f..4b0bc3e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java @@ -67,6 +67,31 @@ public class PartitionInfo implements Serializable { this.partitionIds = new ArrayList<>(); } + /** + * add partition means split default partition, add in last directly + */ + public void addPartition(int addPartitionCount) { + for (int i = 0; i < addPartitionCount; i++) { + partitionIds.add(++MAX_PARTITION); + numPartitions++; + } + } + + /** + * e.g. original partition[0,1,2,3,4,5] + * split partition 2 to partition 6,7,8 (will not reuse 2) + * then sourcePartitionId is 2, newPartitionNumbers is 3 + * @param sourcePartitionIndex + * @param newPartitionNumbers + */ + public void splitPartition(int sourcePartitionIndex, int newPartitionNumbers) { + partitionIds.remove(sourcePartitionIndex); + for (int i = 0; i < newPartitionNumbers; i++) { + partitionIds.add(sourcePartitionIndex + i, ++MAX_PARTITION); + } + numPartitions = numPartitions - 1 + newPartitionNumbers; + } + public List<ColumnSchema> getColumnSchemaList() { return columnSchemaList; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java index d040c1b..9671199 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/partition/PartitionFilterUtil.java @@ -107,7 +107,7 @@ public class PartitionFilterUtil { } } } else { - // LessThanEqualTo(<) + // LessThan(<) outer4: for (int i = 0; i < partitions; i++) { for (String value : listInfo.get(i)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java new file mode 100644 index 0000000..553f85e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.scan.result.BatchResult; + +public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> { + + private CarbonIterator<BatchResult> iterator; + private BatchResult batch; + private int counter; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName()); + + public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> iterator) { + this.iterator = iterator; + } + + @Override public boolean hasNext() { + if (null == batch || checkBatchEnd(batch)) { + if (iterator.hasNext()) { + batch = iterator.next(); + counter = 0; + } else { + return false; + } + } + + return !checkBatchEnd(batch); + } + + @Override public Object[] next() { + if (batch == null) { + batch = iterator.next(); + } + if (!checkBatchEnd(batch)) { + return batch.getRawRow(counter++); + } else { + batch = iterator.next(); + counter = 0; + } + return batch.getRawRow(counter++); + } + + /** + * To check if the batch is processed completely + * @param batch + * @return + */ + private boolean checkBatchEnd(BatchResult batch) { + return !(counter < batch.getSize()); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java index 93bf8eb..2f981b5 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java @@ -16,13 +16,15 @@ */ package org.apache.carbondata.core.scan.wrappers; +import java.io.Serializable; + import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; /** * This class will store the dimension column data when query is executed * This can be used as a key for aggregation */ -public class ByteArrayWrapper implements Comparable<ByteArrayWrapper> { +public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializable { /** * to store key which is generated using http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 7b8904c..fb9b783 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -303,6 +303,20 @@ public class CarbonTablePath extends Path { } } + public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, + int batchNo, String bucketNumber, String timeStamp, + ColumnarFormatVersion columnarFormatVersion) { + switch (columnarFormatVersion) { + case V1: + case V2: + return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + default: + String segmentDir = getSegmentDir(partitionId, segmentId); + return segmentDir + File.separator + getCarbonIndexFileName(Integer.parseInt(taskId), + Integer.parseInt(bucketNumber), batchNo, timeStamp); + } + } + private static String getCarbonIndexFileName(String taskNo, int bucketNumber, String factUpdatedtimeStamp) { return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT; @@ -537,6 +551,10 @@ public class CarbonTablePath extends Path { return Integer.parseInt(taskNo.split(BATCH_PREFIX)[0]); } + public static int getBatchNoFromTaskNo(String taskNo) { + return Integer.parseInt(taskNo.split(BATCH_PREFIX)[1]); + } + /** * Gets the file name from file path */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/examples/spark2/src/main/resources/partition_data.csv ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/partition_data.csv b/examples/spark2/src/main/resources/partition_data.csv deleted file mode 100644 index 2d521b7..0000000 --- a/examples/spark2/src/main/resources/partition_data.csv +++ /dev/null @@ -1,25 +0,0 @@ -vin,logdate,phonenumber,country,area -A42158424831,2016/02/12,125371341,China,Asia -A42158473831,2016/01/12,125371342,China,Asia -A42152474832,2016/02/12,125371343,US,America -A42151477823,2016/12/12,125371344,China,Asia -A42158474135,2016/02/15,125371345,Japan,Asia -A42258434831,2016/12/12,125371346,China,Asia -A42158475831,2016/05/12,125371347,UK,Europe -A41158494830,2015/07/12,225371348,China,Asia -A42158424831,2015/02/12,225371349,China,Asia -A42158473830,2014/01/12,225371310,China,Asia -A42152474830,2013/02/12,325371311,US,America -A42151477823,2012/12/12,425371312,China,Asia -A42158474133,2012/02/15,325371313,Japan,Asia -A42258434835,2013/12/12,525371314,China,Asia -A42158475836,2014/05/12,625371315,UK,Europe -A41158494838,2015/07/12,525371316,China,Asia -A42158424833,2016/02/12,425371317,China,Asia -A42158473832,2017/01/12,325371318,China,Asia -A42152474834,2011/02/12,225371319,US,America -A42151477824,2012/12/12,225371320,China,Asia -A42158474137,2013/02/15,325371321,Japan,Asia -A42258434837,2014/12/12,25371322,China,Asia -A42158475838,2014/05/12,425371323,UK,Europe -A41158494839,2016/07/12,625371324,China,Asia http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala index 8a01ba1..ca0501c 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala @@ -35,7 +35,7 @@ object CarbonPartitionExample { val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" val metastoredb = s"$rootPath/examples/spark2/target" - val testData = s"$rootPath/examples/spark2/src/main/resources/partition_data.csv" + val testData = s"$rootPath/integration/spark-common-test/src/test/resources/partition_data.csv" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") @@ -51,34 +51,57 @@ object CarbonPartitionExample { spark.sparkContext.setLogLevel("WARN") - // none partition table + // range partition with bucket defined spark.sql("DROP TABLE IF EXISTS t0") spark.sql(""" | CREATE TABLE IF NOT EXISTS t0 | ( + | id Int, | vin String, - | logdate Timestamp, | phonenumber Long, | country String, - | area String + | area String, + | salary Int | ) + | PARTITIONED BY (logdate Timestamp) | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='RANGE', + | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01', + | 'BUCKETNUMBER'='3', + | 'BUCKETCOLUMNS'='vin') """.stripMargin) - // range partition + // none partition table spark.sql("DROP TABLE IF EXISTS t1") spark.sql(""" | CREATE TABLE IF NOT EXISTS t1 | ( + | id Int, | vin String, + | logdate Timestamp, | phonenumber Long, | country String, | area String | ) - | PARTITIONED BY (logdate Timestamp) | STORED BY 'carbondata' - | TBLPROPERTIES('PARTITION_TYPE'='RANGE', - | 'RANGE_INFO'='2014/01/01, 2015/01/01, 2016/01/01') + """.stripMargin) + + // list partition + spark.sql("DROP TABLE IF EXISTS t2") + spark.sql(""" + | CREATE TABLE IF NOT EXISTS t2 + | ( + | id Int, + | vin String, + | logdate Timestamp, + | phonenumber Long, + | country String, + | salary Int + | ) + | PARTITIONED BY (area String) + | STORED BY 'carbondata' + | TBLPROPERTIES('PARTITION_TYPE'='LIST', + | 'LIST_INFO'='Asia, America, Europe', 'DICTIONARY_EXCLUDE' ='area') """.stripMargin) // hash partition @@ -86,10 +109,12 @@ object CarbonPartitionExample { spark.sql(""" | CREATE TABLE IF NOT EXISTS t3 | ( + | id Int, | logdate Timestamp, | phonenumber Long, | country String, - | area String + | area String, + | salary Int | ) | PARTITIONED BY (vin String) | STORED BY 'carbondata' @@ -101,17 +126,40 @@ object CarbonPartitionExample { spark.sql(""" | CREATE TABLE IF NOT EXISTS t5 | ( + | id Int, | vin String, | logdate Timestamp, | phonenumber Long, - | area String + | area String, + | salary Int |) | PARTITIONED BY (country String) | STORED BY 'carbondata' | TBLPROPERTIES('PARTITION_TYPE'='LIST', - | 'LIST_INFO'='(China,United States),UK ,japan,(Canada,Russia), South Korea ') + | 'LIST_INFO'='(China, US),UK ,Japan,(Canada,Russia, Good, NotGood), Korea ') """.stripMargin) + // load data into partition table + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table t0 options('BAD_RECORDS_ACTION'='FORCE') + """) + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table t5 options('BAD_RECORDS_ACTION'='FORCE') + """) + + // alter list partition table t5 to add a partition + spark.sql(s"""Alter table t5 add partition ('OutSpace')""".stripMargin) + // alter list partition table t5 to split partition 4 into 3 independent partition + spark.sql( + s""" + Alter table t5 split partition(4) into ('Canada', 'Russia', '(Good, NotGood)') + """.stripMargin) + + spark.sql("""select * from t5 where country = 'Good' """).show(100, false) + + spark.sql("select * from t0 order by salary ").show(100, false) + spark.sql("select * from t5 order by salary ").show(100, false) + // hive partition table spark.sql("DROP TABLE IF EXISTS t7") spark.sql(""" @@ -130,6 +178,7 @@ object CarbonPartitionExample { spark.sql(s"CREATE DATABASE partitionDB") spark.sql(s""" | CREATE TABLE IF NOT EXISTS partitionDB.t9( + | id Int, | logdate Timestamp, | phonenumber Int, | country String, @@ -145,11 +194,11 @@ object CarbonPartitionExample { // show partitions try { - spark.sql("""SHOW PARTITIONS t0""").show(100, false) + spark.sql("""SHOW PARTITIONS t1""").show(100, false) } catch { case ex: AnalysisException => LOGGER.error(ex.getMessage()) } - spark.sql("""SHOW PARTITIONS t1""").show(100, false) + spark.sql("""SHOW PARTITIONS t0""").show(100, false) spark.sql("""SHOW PARTITIONS t3""").show(100, false) spark.sql("""SHOW PARTITIONS t5""").show(100, false) spark.sql("""SHOW PARTITIONS t7""").show(100, false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 19e264b..dc4e76a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -51,8 +51,6 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.partition.PartitionUtil; -import org.apache.carbondata.core.scan.partition.Partitioner; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; @@ -64,6 +62,7 @@ import org.apache.carbondata.core.util.DataTypeConverter; import org.apache.carbondata.core.util.DataTypeConverterImpl; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.CarbonRecordReader; @@ -101,6 +100,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { "mapreduce.input.carboninputformat.segmentnumbers"; // comma separated list of input files public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; + public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class); private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; @@ -164,6 +164,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { throws IOException { configuration.set(FileInputFormat.INPUT_DIR, tablePath); } + + public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { + configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); + } + /** * It sets unresolved filter expression. * @@ -285,23 +290,17 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } - + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); // prune partitions for filter query on partition table BitSet matchedPartitions = null; - if (null != filter) { - PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName()); - if (null != partitionInfo) { - Partitioner partitioner = PartitionUtil.getPartitioner(partitionInfo); - matchedPartitions = new FilterExpressionProcessor() - .getFilteredPartitions(filter, partitionInfo); + if (partitionInfo != null) { + matchedPartitions = setMatchedPartitions(null, filter, partitionInfo); + if (matchedPartitions != null) { if (matchedPartitions.cardinality() == 0) { - // no partition is required return new ArrayList<InputSplit>(); - } - if (matchedPartitions.cardinality() == partitioner.numPartitions()) { - // all partitions are required, no need to prune partitions + } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { matchedPartitions = null; } } @@ -310,7 +309,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); // do block filtering and get split - List<InputSplit> splits = getSplits(job, filterInterface, validSegments, matchedPartitions); + List<InputSplit> splits = + getSplits(job, filterInterface, validSegments, matchedPartitions, partitionInfo, null); // pass the invalid segment to task side in order to remove index entry in task side if (invalidSegments.size() > 0) { for (InputSplit split : splits) { @@ -323,6 +323,85 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } /** + * Read data in one segment. For alter table partition statement + * @param job + * @param targetSegment + * @param oldPartitionIdList get old partitionId before partitionInfo was changed + * @return + * @throws IOException + */ + public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment, + List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) + throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + List<String> invalidSegments = new ArrayList<>(); + List<UpdateVO> invalidTimestampsList = new ArrayList<>(); + + List<String> segmentList = new ArrayList<>(); + segmentList.add(targetSegment); + setSegmentsToAccess(job.getConfiguration(), segmentList); + try { + + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + // this will be null in case of corrupt schema file. + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + + // prune partitions for filter query on partition table + String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID); + BitSet matchedPartitions = null; + if (partitionInfo != null) { + matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo); + if (matchedPartitions != null) { + if (matchedPartitions.cardinality() == 0) { + return new ArrayList<InputSplit>(); + } else if (matchedPartitions.cardinality() == partitionInfo.getNumPartitions()) { + matchedPartitions = null; + } + } + } + + FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + // do block filtering and get split + List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions, + partitionInfo, oldPartitionIdList); + // pass the invalid segment to task side in order to remove index entry in task side + if (invalidSegments.size() > 0) { + for (InputSplit split : splits) { + ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); + ((CarbonInputSplit) split).setInvalidTimestampRange(invalidTimestampsList); + } + } + return splits; + } catch (IOException e) { + throw new RuntimeException("Can't get splits of the target segment ", e); + } + } + + private BitSet setMatchedPartitions(String partitionIds, Expression filter, + PartitionInfo partitionInfo) { + BitSet matchedPartitions = null; + if (null != partitionIds) { + String[] partList = partitionIds.replace("[", "").replace("]", "").split(","); + // only one partitionId in current alter table statement + matchedPartitions = new BitSet(Integer.parseInt(partList[0])); + for (String partitionId : partList) { + matchedPartitions.set(Integer.parseInt(partitionId)); + } + } else { + if (null != filter) { + matchedPartitions = + new FilterExpressionProcessor().getFilteredPartitions(filter, partitionInfo); + } + } + return matchedPartitions; + } + /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS * are used to get table path to read. @@ -331,7 +410,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { * @throws IOException */ private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, - List<String> validSegments, BitSet matchedPartitions) throws IOException { + List<String> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo, + List<Integer> oldPartitionIdList) throws IOException { List<InputSplit> result = new LinkedList<InputSplit>(); UpdateVO invalidBlockVOForSegmentId = null; @@ -344,10 +424,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); - //for each segment fetch blocks matching filter in Driver BTree + // for each segment fetch blocks matching filter in Driver BTree List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, - validSegments); + validSegments, partitionInfo, oldPartitionIdList); for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { // Get the UpdateVO for those tables on which IUD operations being performed. @@ -395,7 +475,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { */ private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, - BitSet matchedPartitions, List<String> segmentIds) throws IOException { + BitSet matchedPartitions, List<String> segmentIds, PartitionInfo partitionInfo, + List<Integer> oldPartitionIdList) throws IOException { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); @@ -409,17 +490,34 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver); List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); + int partitionIndex = 0; + List<Integer> partitionIdList = new ArrayList<>(); + if (partitionInfo != null) { + partitionIdList = partitionInfo.getPartitionIds(); + } for (Blocklet blocklet : prunedBlocklets) { - int taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( + int partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath().toString())); - // matchedPartitions variable will be null in two cases as follows - // 1. the table is not a partition table - // 2. the table is a partition table, and all partitions are matched by query - // for partition table, the task id of carbaondata file name is the partition id. - // if this partition is not required, here will skip it. - if (matchedPartitions == null || matchedPartitions.get(taskId)) { - resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet)); + // OldPartitionIdList is only used in alter table partition command because it change + // partition info first and then read data. + // For other normal query should use newest partitionIdList + if (partitionInfo != null) { + if (oldPartitionIdList != null) { + partitionIndex = oldPartitionIdList.indexOf(partitionId); + } else { + partitionIndex = partitionIdList.indexOf(partitionId); + } + } + if (partitionIndex != -1) { + // matchedPartitions variable will be null in two cases as follows + // 1. the table is not a partition table + // 2. the table is a partition table, and all partitions are matched by query + // for partition table, the task id of carbaondata file name is the partition id. + // if this partition is not required, here will skip it. + if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { + resultFilterredBlocks.add(convertToCarbonInputSplit(blocklet)); + } } } statistic http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 8269757..0dc79fa 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -85,6 +85,14 @@ public class CarbonInputFormatUtil { return carbonInputFormat; } + public static <V> CarbonTableInputFormat<V> createCarbonTableInputFormat( + AbsoluteTableIdentifier identifier, List<String> partitionId, Job job) throws IOException { + CarbonTableInputFormat<V> carbonTableInputFormat = new CarbonTableInputFormat<>(); + carbonTableInputFormat.setPartitionIdList(job.getConfiguration(), partitionId); + FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); + return carbonTableInputFormat; + } + private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) { QueryMeasure queryMeasure = new QueryMeasure(measure.getColName()); queryMeasure.setQueryOrder(order); http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common-test/src/test/resources/partition_data.csv ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/resources/partition_data.csv b/integration/spark-common-test/src/test/resources/partition_data.csv new file mode 100644 index 0000000..ec20d92 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/partition_data.csv @@ -0,0 +1,28 @@ +id,vin,logdate,phonenumber,country,area,salary +1,A42158424831,2016/02/12,125371341,China,Asia,10000 +2,A42158473831,2016/01/12,125371342,China,Asia,10001 +3,A42152474832,2016/02/12,125371343,US,America,10002 +4,A42151477823,2016/12/12,125371344,China,OutSpace,10003 +5,A42158474135,2016/02/15,125371345,Japan,OutSpace,10004 +6,A42258434831,2016/12/12,125371346,China,Asia,10005 +7,A42158475831,2016/05/12,125371347,UK,OutSpace,10006 +8,A41158494830,2015/07/12,225371348,China,Asia,10007 +9,A42158424831,2015/02/12,225371349,China,OutSpace,10008 +10,A42158473830,2014/01/12,225371310,China,Asia,10009 +11,A42152474830,2013/02/12,325371311,US,America,10010 +12,A42151477823,2012/12/12,425371312,China,Asia,10011 +13,A42158474133,2012/02/15,325371313,Japan,Asia,10012 +14,A42258434835,2013/12/12,525371314,China,Asia,10013 +15,A42158475836,2014/05/12,625371315,UK,OutSpace,10014 +16,A41158494838,2015/07/12,525371316,China,Asia,10015 +17,A42158424833,2016/02/12,425371317,China,Asia,10016 +18,A42158473832,2017/01/12,325371318,China,Asia,10017 +19,A42152474834,2011/02/12,225371319,US,America,10018 +20,A42151477824,2012/12/12,225371320,China,Asia,10019 +21,A42158474137,2013/02/15,325371321,Japan,Asia,10020 +22,A42258434837,2014/12/12,25371322,China,Asia,10021 +23,A42158475838,2014/05/12,425371323,UK,OutSpace,10022 +24,A41158494839,2016/07/12,625371324,China,Asia,10023 +25,A41158494840,2016/07/12,626381324,Good,OutSpace,10024 +26,A41158494843,2016/07/12,625378824,NotGood,OutSpace,10025 +27,A41158494999,2016/07/12,625378824,Other,,10026 http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index f7ac87c..39777de 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -59,11 +59,11 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") } - def validateDataFiles(tableUniqueName: String, sgementId: String, partitions: Seq[Int]): Unit = { + def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName, carbonTable.getFactTableName) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", sgementId) + val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 905f977..24c09ca 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -209,12 +209,12 @@ public final class CarbonLoaderUtil { * @param loadModel */ public static void deleteLocalDataLoadFolderLocation(CarbonLoadModel loadModel, - boolean isCompactionFlow) { + boolean isCompactionFlow, boolean isAltPartitionFlow) { String databaseName = loadModel.getDatabaseName(); String tableName = loadModel.getTableName(); String tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(), - loadModel.getTaskNo(), isCompactionFlow); + loadModel.getTaskNo(), isCompactionFlow, isAltPartitionFlow); // form local store location final String localStoreLocations = CarbonProperties.getInstance().getProperty(tempLocationKey); if (localStoreLocations == null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java new file mode 100644 index 0000000..3a37a9d --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/SplitPartitionCallable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.partition; + +import java.util.concurrent.Callable; + +import org.apache.carbondata.spark.rdd.PartitionSplitter; + +import org.apache.spark.sql.execution.command.SplitPartitionCallableModel; + +/** + * Callable class which is used to split the partition in a separate callable. + */ +public class SplitPartitionCallable implements Callable<Void> { + + private final SplitPartitionCallableModel splitPartitionCallableModel; + + public SplitPartitionCallable(SplitPartitionCallableModel splitPartitionCallableModel) { + this.splitPartitionCallableModel = splitPartitionCallableModel; + } + + @Override public Void call() { + PartitionSplitter.triggerPartitionSplit(splitPartitionCallableModel); + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala index 31dd4e6..181f6e4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala @@ -107,6 +107,14 @@ class MergeResultImpl extends MergeResult[String, Boolean] { override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value) } +trait SplitResult[K, V] extends Serializable { + def getKey(key: String, value: Boolean): (K, V) +} + +class SplitResultImpl extends SplitResult[String, Boolean] { + override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value) +} + trait DeletedLoadResult[K, V] extends Serializable { def getKey(key: String, value: String): (K, V) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 4d1267a..e43d204 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -204,7 +204,7 @@ object DataLoadProcessorStepOnSpark { dataWriter.close() } // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala new file mode 100644 index 0000000..e481fc4 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.PartitionUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.PartitionInfo +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.spliter.RowResultSpliterProcessor +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.SplitResult +import org.apache.carbondata.spark.load.CarbonLoaderUtil + +class AlterTableSplitPartitionRDD[K, V]( + sc: SparkContext, + result: SplitResult[K, V], + partitionIds: Seq[String], + segmentId: String, + bucketId: Int, + carbonLoadModel: CarbonLoadModel, + identifier: AbsoluteTableIdentifier, + storePath: String, + oldPartitionIdList: List[Int], + prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + sc.setLocalProperty("spark.job.interruptOnCancel", "true") + + var storeLocation: String = null + var splitResult: String = null + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val databaseName = carbonTable.getDatabaseName + val factTableName = carbonTable.getFactTableName + val partitionInfo = carbonTable.getPartitionInfo(factTableName) + + override protected def getPartitions: Array[Partition] = firstParent[Array[AnyRef]].partitions + + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava + val iter = new Iterator[(K, V)] { + val partitionId = partitionInfo.getPartitionId(split.index) + carbonLoadModel.setTaskNo(String.valueOf(partitionId)) + carbonLoadModel.setSegmentId(segmentId) + carbonLoadModel.setPartitionId("0") + val tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, + segmentId, + carbonLoadModel.getTaskNo, + false, + true) + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + + val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + LOGGER.info(s"Temp storeLocation taken is $storeLocation") + + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, + factTableName, + carbonLoadModel.getTaskNo, + "0", + segmentId, + false, + true + ) + + val splitStatus = if (rows.isEmpty) { + LOGGER.info("After repartition this split, NO target rows to write back.") + true + } else { + try { + val segmentProperties = PartitionUtils.getSegmentProperties(identifier, + segmentId, partitionIds.toList, oldPartitionIdList, partitionInfo) + val processor = new RowResultSpliterProcessor( + carbonTable, + carbonLoadModel, + segmentProperties, + tempStoreLoc, + bucketId + ) + processor.execute(rows) + } catch { + case e: Exception => + sys.error(s"Exception when executing Row result processor ${e.getMessage}") + } finally { + CarbonLoaderUtil + .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) + } + + } + + val splitResult = segmentId + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(splitResult, splitStatus) + } + } + iter + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index add0578..07264b5 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -153,7 +153,8 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.getTableName, carbonLoadModel.getSegmentId, carbonLoadModel.getTaskNo, - true) + true, + false) CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) LOGGER.info(s"Temp storeLocation taken is $storeLocation") // get destination segment properties as sent from driver which is of last segment. @@ -196,7 +197,8 @@ class CarbonMergerRDD[K, V]( carbonLoadModel.getTaskNo, "0", mergeNumber, - true + true, + false ) carbonLoadModel.setPartitionId("0") @@ -231,7 +233,7 @@ class CarbonMergerRDD[K, V]( try { val isCompactionFlow = true CarbonLoaderUtil - .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow) + .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false) } catch { case e: Exception => LOGGER.error(e) http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala new file mode 100644 index 0000000..2a39db5 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.util.ArrayList + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.DistributionUtil +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.PartitionUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo} +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, CarbonMeasure} +import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper +import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil} +import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.processing.merger.CarbonCompactionUtil +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.spliter.CarbonSplitExecutor +import org.apache.carbondata.spark.load.CarbonLoaderUtil + + +/** + * This RDD is used in alter table partition statement to get data of target partitions, + * then repartition data according to new partitionInfo + * @param sc + * @param partitionIds the ids of target partition to be scanned + * @param storePath + * @param segmentId + * @param bucketId + * @param oldPartitionIdList the taskId in partition order before partitionInfo is modified + * @param carbonTableIdentifier + * @param carbonLoadModel + */ +class CarbonScanPartitionRDD( + sc: SparkContext, + partitionIds: Seq[String], + storePath: String, + segmentId: String, + bucketId: Int, + oldPartitionIdList: List[Int], + carbonTableIdentifier: CarbonTableIdentifier, + carbonLoadModel: CarbonLoadModel) + extends RDD[(AnyRef, Array[AnyRef])](sc, Nil) { + + private val queryId = sc.getConf.get("queryId", System.nanoTime() + "") + val identifier = new AbsoluteTableIdentifier(storePath, carbonTableIdentifier) + var storeLocation: String = null + var splitStatus: Boolean = false + var blockId: String = null + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val dimensions = carbonTable.getAllDimensions.asScala + val measures = carbonTable.getAllMeasures.asScala + val partitionInfo = carbonTable.getPartitionInfo(carbonTableIdentifier.getTableName) + val partitionColumn = partitionInfo.getColumnSchemaList().get(0) + val partitionDataType = partitionColumn.getDataType + val partitionColumnName = partitionColumn.getColumnName + var isDimension: Boolean = false + val encodingList = partitionColumn.getEncodingList + var dimension: CarbonDimension = null + var measure: CarbonMeasure = null + val noDictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() + val dictionaryIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() + val measureIndexGroup: ArrayBuffer[Int] = new ArrayBuffer[Int]() + + override def getPartitions: Array[Partition] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val parallelism = sparkContext.defaultParallelism + val jobConf = new JobConf(new Configuration) + val job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonTableInputFormat(identifier, + partitionIds.toList.asJava, job) + job.getConfiguration.set("query.id", queryId) + + val splits = format.getSplitsOfOneSegment(job, segmentId, + oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo) + var partition_num = 0 + val result = new ArrayList[Partition](parallelism) + val blockList = splits.asScala + .filter(_.asInstanceOf[CarbonInputSplit].getBucketId.toInt == bucketId) + .map(_.asInstanceOf[Distributable]) + if (!blockList.isEmpty) { + val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) + val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, + parallelism, activeNodes.toList.asJava) + nodeBlockMapping.asScala.foreach { case (node, blockList) => + blockList.asScala.foreach { blocksPerTask => + val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) + if (blocksPerTask.size() != 0) { + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node)) + val partition = new CarbonSparkPartition(id, partition_num, multiBlockSplit) + result.add(partition) + partition_num += 1 + } + } + } + } + result.toArray(new Array[Partition](result.size())) + } + + override def compute(split: Partition, context: TaskContext): + Iterator[(AnyRef, Array[AnyRef])] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var exec : CarbonSplitExecutor = null + val rows : java.util.List[(AnyRef, Array[AnyRef])] = new ArrayList[(AnyRef, Array[AnyRef])]() + try { + val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + val splits = inputSplit.getAllSplits.asScala + val tableBlockInfoList = CarbonInputSplit.createBlocks(splits.asJava) + val segmentMapping: java.util.Map[String, TaskBlockInfo] = + CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + var result : java.util.List[PartitionSpliterRawResultIterator] = null + try { + exec = new CarbonSplitExecutor(segmentMapping, carbonTable) + result = exec.processDataBlocks(segmentId) + } catch { + case e: Throwable => + LOGGER.error(e) + if (null != e.getMessage) { + sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") + } else { + sys.error("Exception occurred in query execution. Please check logs.") + } + } + val segmentProperties = PartitionUtils.getSegmentProperties(identifier, segmentId, + partitionIds.toList, oldPartitionIdList, partitionInfo) + val partColIdx = getPartitionColumnIndex(partitionColumnName, segmentProperties) + indexInitialise() + for (iterator <- result.asScala) { + while (iterator.hasNext) { + val row = iterator.next() + val partitionColumnValue = getPartitionColumnValue(row, partColIdx, + segmentProperties) + rows.add((partitionColumnValue, row)) + } + } + } catch { + case e: Exception => + LOGGER.error(e) + throw e + } finally { + if (null != exec) { + exec.finish + } + } + val iter = rows.iterator().asScala + iter + } + + def getPartitionColumnValue(row: Array[AnyRef], partColIdx: Int, + segmentProperties: SegmentProperties): AnyRef = { + val dims: Array[Byte] = row(0).asInstanceOf[ByteArrayWrapper].getDictionaryKey + val keyGen = segmentProperties.getDimensionKeyGenerator + val keyArray: Array[Long] = keyGen.getKeyArray(dims) + val encodings = partitionColumn.getEncodingList + val partitionType = partitionInfo.getPartitionType + var partitionValue: AnyRef = null + val factor = 1000L + if (isDimension) { + // direct dictionary + if (encodings.contains(Encoding.DIRECT_DICTIONARY)) { + val directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(partitionDataType) + val dictionaryIndex = dictionaryIndexGroup.indexOf(partColIdx) + val surrogateValue = (keyArray(dictionaryIndex) / factor).toInt + partitionValue = directDictionaryGenerator.getValueFromSurrogate(surrogateValue) + } else if (!encodings.contains(Encoding.DICTIONARY)) { + // no dictionary + val byteArray = row(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeys + val index = noDictionaryIndexGroup.indexOf(partColIdx) + partitionValue = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(byteArray(index) + , partitionDataType) + if (partitionValue.isInstanceOf[UTF8String]) { + partitionValue = partitionValue.toString + } + } else { // normal dictionary + val dict = CarbonLoaderUtil.getDictionary(carbonTableIdentifier, + dimension.getColumnIdentifier, storePath, partitionDataType) + if (partitionDataType == DataType.STRING) { + if (partitionType == PartitionType.RANGE) { + partitionValue = ByteUtil. + toBytes(dict.getDictionaryValueForKey(keyArray(partColIdx).toInt)) + } else { + partitionValue = dict.getDictionaryValueForKey(keyArray(partColIdx).toInt) + } + } else { + partitionValue = dict.getDictionaryValueForKey(keyArray(partColIdx).toInt) + } + + } + } else { + partitionValue = row(measureIndexGroup(partColIdx)) + } + partitionValue + } + + def indexInitialise(): Unit = { + for (dim: CarbonDimension <- dimensions) { + if (!dim.getEncoder.contains(Encoding.DICTIONARY)) { + noDictionaryIndexGroup.append(dimensions.indexOf(dim)) + } else { + dictionaryIndexGroup.append(dimensions.indexOf(dim)) + } + } + for (msr: CarbonMeasure <- measures) { + // index of measure in row + measureIndexGroup.append(measures.indexOf(msr) + 1) + } + } + + /** + * get the index of partition column in dimension/measure + * @param partitionColumnName + * @param segmentProperties + * @return + */ + def getPartitionColumnIndex(partitionColumnName: String, + segmentProperties: SegmentProperties): Int = { + val dimensions = segmentProperties.getDimensions + val measures = segmentProperties.getMeasures + val columns = dimensions.asScala.map(_.getColName) ++ measures.asScala.map(_.getColName) + var index = 0 + for (i <- 0 until columns.size) { + if (columns(i) == partitionColumnName) { + index = i + } + } + if (index < dimensions.size()) { + isDimension = true + dimension = dimensions.get(index) + } else { + index = index - dimensions.size() + measure = measures.get(index) + } + index + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index a6a8835..bca119e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel} +import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, SplitPartitionCallableModel} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -39,6 +39,7 @@ import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM import org.apache.carbondata.spark._ import org.apache.carbondata.spark.compaction.CompactionCallable import org.apache.carbondata.spark.load._ +import org.apache.carbondata.spark.partition.SplitPartitionCallable import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil} /** @@ -260,6 +261,50 @@ object DataManagementFunc { futureList.add(future) } + def executePartitionSplit( sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + executor: ExecutorService, + storePath: String, + segment: String, + partitionId: String, + oldPartitionIdList: List[Int]): Unit = { + val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]]( + CarbonCommonConstants.DEFAULT_COLLECTION_SIZE + ) + scanSegmentsForSplitPartition(futureList, executor, storePath, segment, partitionId, + sqlContext, carbonLoadModel, oldPartitionIdList) + try { + futureList.asScala.foreach(future => { + future.get + } + ) + } catch { + case e: Exception => + LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }") + throw e + } + } + + private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]], + executor: ExecutorService, + storePath: String, + segmentId: String, + partitionId: String, + sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + oldPartitionIdList: List[Int]): Unit = { + + val splitModel = SplitPartitionCallableModel(storePath, + carbonLoadModel, + segmentId, + partitionId, + oldPartitionIdList, + sqlContext) + + val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel)) + futureList.add(future) + } + def prepareCarbonLoadModel(storePath: String, table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index b51800e..37b2d02 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -267,7 +267,7 @@ class NewCarbonDataLoadRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE @@ -453,7 +453,7 @@ class NewDataFrameLoaderRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE @@ -638,7 +638,7 @@ class PartitionTableDataLoaderRDD[K, V]( throw e } finally { // clean up the folders and files created locally for data load operation - CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false) // in case of failure the same operation will be re-tried several times. // So print the data load statistics only in case of non failure case if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala new file mode 100644 index 0000000..48e1bee --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.io.IOException + +import org.apache.spark.sql.execution.command.SplitPartitionCallableModel +import org.apache.spark.util.PartitionUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.spark.{PartitionFactory, SplitResultImpl} + +object PartitionSplitter { + + val logger = LogServiceFactory.getLogService(PartitionSplitter.getClass.getName) + + def triggerPartitionSplit(splitPartitionCallableModel: SplitPartitionCallableModel): Unit = { + val sc = splitPartitionCallableModel.sqlContext.sparkContext + val partitionId = splitPartitionCallableModel.partitionId + val storePath = splitPartitionCallableModel.storePath + val segmentId = splitPartitionCallableModel.segmentId + val oldPartitionIdList = splitPartitionCallableModel.oldPartitionIdList + val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = carbonTable.getAbsoluteTableIdentifier + val carbonTableIdentifier = identifier.getCarbonTableIdentifier + val tableName = carbonTable.getFactTableName + val databaseName = carbonTable.getDatabaseName + val bucketInfo = carbonTable.getBucketingInfo(tableName) + var finalSplitStatus = false + val bucketNumber = bucketInfo match { + case null => 1 + case _ => bucketInfo.getNumberOfBuckets + } + val partitionInfo = carbonTable.getPartitionInfo(tableName) + val partitioner = PartitionFactory.getPartitioner(partitionInfo) + + for (i <- 0 until bucketNumber) { + val bucketId = i + val rdd = new CarbonScanPartitionRDD( + sc, + Seq(partitionId), + storePath, + segmentId, + bucketId, + oldPartitionIdList, + carbonTableIdentifier, + carbonLoadModel + ).partitionBy(partitioner).map(_._2) + + val splitStatus = new AlterTableSplitPartitionRDD(sc, + new SplitResultImpl(), + Seq(partitionId), + segmentId, + bucketId, + carbonLoadModel, + identifier, + storePath, + oldPartitionIdList, + rdd).collect() + + if (splitStatus.length == 0) { + finalSplitStatus = false + } else { + finalSplitStatus = splitStatus.forall(_._2) + } + if (!finalSplitStatus) { + logger.audit(s"Add/Split Partition request failed for table " + + s"${ databaseName }.${ tableName }") + logger.error(s"Add/Split Partition request failed for table " + + s"${ databaseName }.${ tableName }") + } + } + if (finalSplitStatus) { + try { + PartitionUtils. + deleteOriginalCarbonFile(identifier, segmentId, Seq(partitionId).toList, + oldPartitionIdList, storePath, databaseName, tableName, partitionInfo, carbonLoadModel) + } catch { + case e: IOException => sys.error(s"Exception while delete original carbon files " + + e.getMessage) + } + logger.audit(s"Add/Split Partition request completed for table " + + s"${ databaseName }.${ tableName }") + logger.info(s"Add/Split Partition request completed for table " + + s"${ databaseName }.${ tableName }") + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 705daea..4d781a1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -292,11 +292,13 @@ object CommonUtil { value.matches(pattern) case "timestamp" => val timeStampFormat = new SimpleDateFormat(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)) + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) scala.util.Try(timeStampFormat.parse(value)).isSuccess case "date" => val dateFormat = new SimpleDateFormat(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT)) + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) scala.util.Try(dateFormat.parse(value)).isSuccess case _ => validateTypeConvertForSpark2(partitionerField, value) @@ -333,6 +335,40 @@ object CommonUtil { } } + def validateSplitListInfo(originListInfo: List[String], newListInfo: List[String], + originList: List[List[String]]): Unit = { + if (originListInfo.size == 1) { + sys.error("The target list partition cannot be split, please check again!") + } + if (newListInfo.size == 1) { + sys.error("Can't split list to one partition, please check again!") + } + if (!(newListInfo.size < originListInfo.size)) { + sys.error("The size of new list must be smaller than original list, please check again!") + } + val tempList = newListInfo.mkString(",").split(",") + .map(_.trim.replace("(", "").replace(")", "")) + if (tempList.length != originListInfo.size) { + sys.error("The total number of elements in new list must equal to original list!") + } + if (!originListInfo.sameElements(tempList)) { + sys.error("The elements in new list must exist in original list") + } + } + + def validateAddListInfo(newListInfo: List[String], originList: List[List[String]]): Unit = { + if (newListInfo.size < 1) { + sys.error("Please add at least one new partition") + } + for (originElementGroup <- originList) { + for (newElement <- newListInfo ) { + if (originElementGroup.contains(newElement)) { + sys.error(s"The partition $newElement is already exist! Please check again!") + } + } + } + } + def validateFields(key: String, fields: Seq[Field]): Boolean = { var isValid: Boolean = false fields.foreach { field => http://git-wip-us.apache.org/repos/asf/carbondata/blob/874764f9/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 500e18e..70a2498 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -126,6 +126,13 @@ case class CompactionCallableModel(storePath: String, sqlContext: SQLContext, compactionType: CompactionType) +case class SplitPartitionCallableModel(storePath: String, + carbonLoadModel: CarbonLoadModel, + segmentId: String, + partitionId: String, + oldPartitionIdList: List[Int], + sqlContext: SQLContext) + case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0) case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo, @@ -151,6 +158,15 @@ case class AlterTableDropColumnModel(databaseName: Option[String], tableName: String, columns: List[String]) +case class AlterTableDropPartitionModel(databaseName: Option[String], + tableName: String, + partitionId: String) + +case class AlterTableSplitPartitionModel(databaseName: Option[String], + tableName: String, + partitionId: String, + splitInfo: List[String]) + class AlterTableColumnSchemaGenerator( alterTableModel: AlterTableAddColumnsModel, dbName: String,