Repository: carbondata
Updated Branches:
  refs/heads/datamap 6d71d9c47 -> b385d14b4 (forced update)


fix compact bug for partition table


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aecf496e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aecf496e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aecf496e

Branch: refs/heads/datamap
Commit: aecf496eda9c1b3ae854a16d4dcbfb1c7e3701e8
Parents: c8f742d
Author: QiangCai <david.c...@gmail.com>
Authored: Wed Jun 7 11:51:08 2017 +0800
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Thu Jun 29 11:56:31 2017 +0530

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  4 +
 .../TestCompactionForPartitionTable.scala       | 84 ++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 27 +++++--
 .../spark/rdd/CarbonSparkPartition.scala        |  3 +-
 4 files changed, 109 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 429b1df..ae97262 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -593,6 +593,10 @@ public class CarbonTable implements Serializable {
     return tablePartitionMap.get(tableName);
   }
 
+  public boolean isPartitionTable() {
+    return null != tablePartitionMap.get(getFactTableName());
+  }
+
   public PartitionStatistic getPartitionStatistic() {
     return partitionStatistic;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala
new file mode 100644
index 0000000..ae8387e
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestCompactionForPartitionTable.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.partition
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.test.TestQueryExecutor
+import org.scalatest.BeforeAndAfterAll
+
+class TestCompactionForPartitionTable extends QueryTest with BeforeAndAfterAll 
{
+
+  override def beforeAll {
+    dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation 
String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
+  test("minor compaction") {
+    sql("create table part_minor_compact(a String, b int) partitioned by (c 
int) stored by 'carbondata' 
tblproperties('PARTITION_TYPE'='LIST','LIST_INFO'='1,2')")
+    sql("insert into part_minor_compact select 'a', 2, 3 from originTable 
limit 1")
+    sql("insert into part_minor_compact select 'b', 3, 4 from originTable 
limit 1")
+    sql("insert into part_minor_compact select 'c', 4, 5 from originTable 
limit 1")
+    sql("insert into part_minor_compact select 'd', 1, 2 from originTable 
limit 1")
+
+    checkAnswer(sql("select * from part_minor_compact where c = 4"), 
Seq(Row("b", 3, 4)))
+
+    sql("alter table part_minor_compact compact 'minor'")
+
+    checkAnswer(sql("select * from part_minor_compact where c = 4"), 
Seq(Row("b", 3, 4)))
+  }
+
+  test("major compaction") {
+    sql("create table part_major_compact(a String, b int) partitioned by (c 
int) stored by 'carbondata' 
tblproperties('PARTITION_TYPE'='LIST','LIST_INFO'='1,2')")
+    sql("insert into part_major_compact select 'a', 2, 3 from originTable 
limit 1")
+    sql("insert into part_major_compact select 'b', 3, 4 from originTable 
limit 1")
+    sql("insert into part_major_compact select 'c', 4, 5 from originTable 
limit 1")
+    sql("insert into part_major_compact select 'd', 1, 2 from originTable 
limit 1")
+
+    checkAnswer(sql("select * from part_major_compact where c = 4"), 
Seq(Row("b", 3, 4)))
+
+    sql("alter table part_major_compact compact 'major'")
+
+    checkAnswer(sql("select * from part_major_compact where c = 4"), 
Seq(Row("b", 3, 4)))
+  }
+
+  override def afterAll = {
+    dropTable
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
TestQueryExecutor.timestampFormat)
+  }
+
+  def dropTable = {
+    sql("drop table if exists part_minor_compact")
+    sql("drop table if exists part_major_compact")
+    sql("drop table if exists originTable")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 908043a..815dba3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -77,8 +77,13 @@ class CarbonMergerRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): 
Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-
-      carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
+      if (carbonTable.isPartitionTable) {
+        
carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
+      } else {
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+      }
       val tempLocationKey = CarbonDataProcessorUtil
         .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
           carbonLoadModel.getTableName,
@@ -108,7 +113,7 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       try {
-        val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
+
 
         // sorting the table block info List.
         val splitList = carbonSparkPartition.split.value.getAllSplits
@@ -140,7 +145,6 @@ class CarbonMergerRDD[K, V](
             .toList
         }
 
-        val carbonTable = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         // get destination segment properties as sent from driver which is of 
last segment.
         val segmentProperties = new SegmentProperties(
           carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
@@ -266,7 +270,8 @@ class CarbonMergerRDD[K, V](
     job.getConfiguration.set("query.id", queryId)
     var defaultParallelism = sparkContext.defaultParallelism
     val result = new java.util.ArrayList[Partition](defaultParallelism)
-    var partitionNo = 0
+    var taskPartitionNo = 0
+    var carbonPartitionId = 0;
     var columnSize = 0
     var noOfBlocks = 0
 
@@ -398,6 +403,7 @@ class CarbonMergerRDD[K, V](
     logInfo("Time taken to wait for executor allocation is =" + ((30 - 
maxTimes) * 500) + "millis")
     defaultParallelism = sparkContext.defaultParallelism
 
+    val isPartitionTable = 
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPartitionTable
     // Create Spark Partition for each task and assign blocks
     nodeBlockMap.asScala.foreach { case (nodeName, splitList) =>
       val taskSplitList = new java.util.ArrayList[NodeInfo](0)
@@ -410,11 +416,16 @@ class CarbonMergerRDD[K, V](
           NodeInfo(splitsPerNode.getTaskId, 
splitsPerNode.getCarbonInputSplitList.size()))
 
         if (blockletCount != 0) {
+          val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
           val multiBlockSplit = new 
CarbonMultiBlockSplit(absoluteTableIdentifier,
-            
splitInfo.asInstanceOf[CarbonInputSplitTaskInfo].getCarbonInputSplitList,
+            taskInfo.getCarbonInputSplitList,
             Array(nodeName))
-          result.add(new CarbonSparkPartition(id, partitionNo, 
multiBlockSplit))
-          partitionNo += 1
+          if (isPartitionTable) {
+            carbonPartitionId = Integer.parseInt(taskInfo.getTaskId)
+          }
+          result.add(
+            new CarbonSparkPartition(id, taskPartitionNo, multiBlockSplit, 
carbonPartitionId))
+          taskPartitionNo += 1
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aecf496e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
index 82a471f..cf539ba 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
@@ -24,7 +24,8 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
 class CarbonSparkPartition(
     val rddId: Int,
     val idx: Int,
-    @transient val multiBlockSplit: CarbonMultiBlockSplit)
+    @transient val multiBlockSplit: CarbonMultiBlockSplit,
+    val partitionId: Int = 0)
     extends Partition {
 
   val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)

Reply via email to