Repository: carbondata Updated Branches: refs/heads/datamap 6d71d9c47 -> b385d14b4 (forced update)
fix compact bug for partition table Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aecf496e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aecf496e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aecf496e Branch: refs/heads/datamap Commit: aecf496eda9c1b3ae854a16d4dcbfb1c7e3701e8 Parents: c8f742d Author: QiangCai <david.c...@gmail.com> Authored: Wed Jun 7 11:51:08 2017 +0800 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Thu Jun 29 11:56:31 2017 +0530 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 4 + .../TestCompactionForPartitionTable.scala | 84 ++++++++++++++++++++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 27 +++++-- .../spark/rdd/CarbonSparkPartition.scala | 3 +- 4 files changed, 109 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 429b1df..ae97262 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -593,6 +593,10 @@ public class CarbonTable implements Serializable { return tablePartitionMap.get(tableName); } + public boolean isPartitionTable() { + return null != tablePartitionMap.get(getFactTableName()); + } + public PartitionStatistic getPartitionStatistic() { return partitionStatistic; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala new file mode 100644 index 0000000..ae8387e --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.partition + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.Row +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.test.TestQueryExecutor +import org.scalatest.BeforeAndAfterAll + +class TestCompactionForPartitionTable extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + dropTable + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + } + + test("minor compaction") { + sql("create table part_minor_compact(a String, b int) partitioned by (c int) stored by 'carbondata' tblproperties('PARTITION_TYPE'='LIST','LIST_INFO'='1,2')") + sql("insert into part_minor_compact select 'a', 2, 3 from originTable limit 1") + sql("insert into part_minor_compact select 'b', 3, 4 from originTable limit 1") + sql("insert into part_minor_compact select 'c', 4, 5 from originTable limit 1") + sql("insert into part_minor_compact select 'd', 1, 2 from originTable limit 1") + + checkAnswer(sql("select * from part_minor_compact where c = 4"), Seq(Row("b", 3, 4))) + + sql("alter table part_minor_compact compact 'minor'") + + checkAnswer(sql("select * from part_minor_compact where c = 4"), Seq(Row("b", 3, 4))) + } + + test("major compaction") { + sql("create table part_major_compact(a String, b int) partitioned by (c int) stored by 'carbondata' tblproperties('PARTITION_TYPE'='LIST','LIST_INFO'='1,2')") + sql("insert into part_major_compact select 'a', 2, 3 from originTable limit 1") + sql("insert into part_major_compact select 'b', 3, 4 from originTable limit 1") + sql("insert into part_major_compact select 'c', 4, 5 from originTable limit 1") + sql("insert into part_major_compact select 'd', 1, 2 from originTable limit 1") + + checkAnswer(sql("select * from part_major_compact where c = 4"), Seq(Row("b", 3, 4))) + + sql("alter table part_major_compact compact 'major'") + + checkAnswer(sql("select * from part_major_compact where c = 4"), Seq(Row("b", 3, 4))) + } + + override def afterAll = { + dropTable + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat) + } + + def dropTable = { + sql("drop table if exists part_minor_compact") + sql("drop table if exists part_major_compact") + sql("drop table if exists originTable") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 908043a..815dba3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -77,8 +77,13 @@ class CarbonMergerRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - - carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] + if (carbonTable.isPartitionTable) { + carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId)) + } else { + carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) + } val tempLocationKey = CarbonDataProcessorUtil .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, @@ -108,7 +113,7 @@ class CarbonMergerRDD[K, V]( var mergeNumber = "" var exec: CarbonCompactionExecutor = null try { - val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] + // sorting the table block info List. val splitList = carbonSparkPartition.split.value.getAllSplits @@ -140,7 +145,6 @@ class CarbonMergerRDD[K, V]( .toList } - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable // get destination segment properties as sent from driver which is of last segment. val segmentProperties = new SegmentProperties( carbonMergerMapping.maxSegmentColumnSchemaList.asJava, @@ -266,7 +270,8 @@ class CarbonMergerRDD[K, V]( job.getConfiguration.set("query.id", queryId) var defaultParallelism = sparkContext.defaultParallelism val result = new java.util.ArrayList[Partition](defaultParallelism) - var partitionNo = 0 + var taskPartitionNo = 0 + var carbonPartitionId = 0; var columnSize = 0 var noOfBlocks = 0 @@ -398,6 +403,7 @@ class CarbonMergerRDD[K, V]( logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis") defaultParallelism = sparkContext.defaultParallelism + val isPartitionTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPartitionTable // Create Spark Partition for each task and assign blocks nodeBlockMap.asScala.foreach { case (nodeName, splitList) => val taskSplitList = new java.util.ArrayList[NodeInfo](0) @@ -410,11 +416,16 @@ class CarbonMergerRDD[K, V]( NodeInfo(splitsPerNode.getTaskId, splitsPerNode.getCarbonInputSplitList.size())) if (blockletCount != 0) { + val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier, - splitInfo.asInstanceOf[CarbonInputSplitTaskInfo].getCarbonInputSplitList, + taskInfo.getCarbonInputSplitList, Array(nodeName)) - result.add(new CarbonSparkPartition(id, partitionNo, multiBlockSplit)) - partitionNo += 1 + if (isPartitionTable) { + carbonPartitionId = Integer.parseInt(taskInfo.getTaskId) + } + result.add( + new CarbonSparkPartition(id, taskPartitionNo, multiBlockSplit, carbonPartitionId)) + taskPartitionNo += 1 } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala index 82a471f..cf539ba 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala @@ -24,7 +24,8 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit class CarbonSparkPartition( val rddId: Int, val idx: Int, - @transient val multiBlockSplit: CarbonMultiBlockSplit) + @transient val multiBlockSplit: CarbonMultiBlockSplit, + val partitionId: Int = 0) extends Partition { val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)