Repository: carbondata Updated Branches: refs/heads/master a5645875b -> 94ea913a0
[CARBONDATA-2308] Support concurrent loading and compaction When data loading (or insert into) is in progress, user should be able to do compaction on same table This PR supports it. This closes #2132 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94ea913a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94ea913a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94ea913a Branch: refs/heads/master Commit: 94ea913a0c626c955f63db5033539d6228a77f8d Parents: a564587 Author: Jacky Li <jacky.li...@qq.com> Authored: Mon Apr 2 17:21:37 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Apr 8 22:31:49 2018 +0800 ---------------------------------------------------------------------- .../TestInsertAndOtherCommandConcurrent.scala | 31 +++++++++++++++----- .../CarbonAlterTableCompactionCommand.scala | 4 +-- 2 files changed, 25 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/94ea913a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 65857b1..86f0f10 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -123,7 +123,7 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA } assert(future.get.contains("PASS")) assert(ex.getMessage.contains( - "loading is in progress for table default.orders, compaction operation is not allowed")) + "insert overwrite is in progress for table default.orders, compaction operation is not allowed")) } test("update should fail if insert overwrite is in progress") { @@ -198,14 +198,29 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA // ----------- INSERT -------------- - test("compaction should fail if insert is in progress") { - val future = runSqlAsync("insert into table orders select * from orders_overwrite") - val ex = intercept[ConcurrentOperationException]{ - sql("alter table orders compact 'MINOR'") - } + test("compaction should allow if insert is in progress") { + sql("drop table if exists t1") + + // number of segment is 1 after createTable + createTable("t1") + // number of segment is 2 after insert + sql("insert into table t1 select * from orders_overwrite") + + sql( + s""" + | create datamap dm_t1 on table t1 + | using '${classOf[WaitingDataMap].getName}' + | as select count(a) from hiveMetaStoreTable_1") + """.stripMargin) + val future = runSqlAsync("insert into table t1 select * from orders_overwrite") + sql("alter table t1 compact 'MAJOR'") assert(future.get.contains("PASS")) - assert(ex.getMessage.contains( - "loading is in progress for table default.orders, compaction operation is not allowed")) + + // all segments are compacted + val segments = sql("show segments for table t1").collect() + assert(segments.length == 5) + + sql("drop table t1") } test("update should fail if insert is in progress") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/94ea913a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index dc96399..a7b5f7e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -98,8 +98,8 @@ case class CarbonAlterTableCompactionCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - if (SegmentStatusManager.isLoadInProgressInTable(table)) { - throw new ConcurrentOperationException(table, "loading", "compaction") + if (SegmentStatusManager.isOverwriteInProgressInTable(table)) { + throw new ConcurrentOperationException(table, "insert overwrite", "compaction") } operationContext.setProperty("compactionException", "true") var compactionType: CompactionType = null