This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8aa9522 [CARBONDATA-3891] Fix loading data will update all segments updateDeltaEndTimestamp 8aa9522 is described below commit 8aa9522c087e0e9bf2cb7c1532742094a4b3bd9a Author: IceMimosa <chk19940...@gmail.com> AuthorDate: Mon Jul 20 23:08:40 2020 +0800 [CARBONDATA-3891] Fix loading data will update all segments updateDeltaEndTimestamp Why is this PR needed? Loading Data to the partitioned table will update all segments updateDeltaEndTimestamp,that will cause the driver to clear all segments cache when doing the query. What changes were proposed in this PR? update only the current insert overwrite segment. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3848 --- .../hadoop/api/CarbonOutputCommitter.java | 21 +++++--------- .../allqueries/InsertIntoCarbonTableTestCase.scala | 32 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 4fd754b..a816894 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -224,23 +224,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } } String updateTime = - context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, null); + context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, uniqueId); String segmentsToBeDeleted = context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, ""); - List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null); - Set<Segment> segmentSet = new HashSet<>(); - if (updateTime != null || uniqueId != null) { - segmentSet = new HashSet<>( - new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(), - context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV()) - .getValidSegments()); + List<Segment> segmentDeleteList = Collections.emptyList(); + if (!segmentsToBeDeleted.trim().isEmpty()) { + segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null); } if (updateTime != null) { - CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true, - segmentDeleteList); - } else if (uniqueId != null) { - CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true, - segmentDeleteList); + CarbonUpdateUtil.updateTableMetadataStatus(Collections.singleton(loadModel.getSegment()), + carbonTable, updateTime, true, segmentDeleteList); } } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index 9588d27..0ed4923 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.spark.testsuite.allqueries +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -402,6 +403,37 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql(s"DROP TABLE IF EXISTS table1") } + test("test loading data into partitioned table with segment's updateDeltaEndTimestamp not change") { + val tableName = "test_partitioned_table" + sql(s"drop table if exists $tableName") + sql(s""" + |create table if not exists $tableName( + | id bigint, + | name string + |) + |STORED AS carbondata + |partitioned by (dt string) + |""".stripMargin) + val carbonTable = CarbonEnv.getCarbonTable( + Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME), tableName)(sqlContext.sparkSession) + val dt1 = "dt1" + sql(s"insert overwrite table $tableName partition(dt='$dt1') select 1, 'a'") + val dt1Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + assert(dt1Metas.length == 1) + val dt1Seg1 = dt1Metas(0) + + val dt2 = "dt2" + sql(s"insert overwrite table $tableName partition(dt='$dt2') select 1, 'a'") + val dt2Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + assert(dt2Metas.length == 2) + val dt2Seg1 = dt2Metas(0) + val dt2Seg2 = dt2Metas(1) + + assert(dt1Seg1.getUpdateDeltaEndTimestamp == dt2Seg1.getUpdateDeltaEndTimestamp) + assert(dt1Seg1.getUpdateDeltaEndTimestamp != dt2Seg2.getUpdateDeltaEndTimestamp) + sql(s"drop table if exists $tableName") + } + override def afterAll { sql("drop table if exists load") sql("drop table if exists inser")