Repository: incubator-carbondata Updated Branches: refs/heads/master 7147be281 -> e23222cf1
Added table Status lock while deleting the segments Added locking while delete date, clean files Corrected logs Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/08cb3f45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/08cb3f45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/08cb3f45 Branch: refs/heads/master Commit: 08cb3f4502107f5724abb55a49ee10a38b1b4d1a Parents: 7147be2 Author: Manohar <manohar.craz...@gmail.com> Authored: Wed Sep 14 22:48:23 2016 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Sun Sep 18 02:50:43 2016 +0530 ---------------------------------------------------------------------- .../carbondata/spark/load/CarbonLoaderUtil.java | 3 +- .../spark/rdd/CarbonDataRDDFactory.scala | 57 ++++++--- .../execution/command/carbonTableSchema.scala | 56 +++++---- .../dataretention/DataRetentionTestCase.scala | 117 +++++++++++++++---- .../carbondata/lcm/locks/CarbonLockUtil.java | 64 ++++++++++ .../apache/carbondata/lcm/locks/LockUsage.java | 12 +- .../lcm/status/SegmentStatusManager.java | 103 ++++++++++------ 7 files changed, 316 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index a8302c0..72078a6 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -473,9 +473,8 @@ public final class CarbonLoaderUtil { } CarbonUtil.closeStreams(brWriter); - + writeOperation.close(); } - writeOperation.close(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1144c59..5a1d14f 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -44,7 +44,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType} -import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.processing.util.CarbonDataProcessorUtil @@ -52,7 +52,7 @@ import org.apache.carbondata.spark._ import org.apache.carbondata.spark.load._ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil} +import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, LoadMetadataUtil} /** * This is the factory class which can create different RDD depends on user needs. @@ -1106,6 +1106,9 @@ object CarbonDataRDDFactory extends Logging { val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier) val details = segmentStatusManager .readLoadMetadata(loadMetadataFilePath) + val carbonTableStatusLock = CarbonLockFactory + .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.TABLE_STATUS_LOCK) // Delete marked loads val isUpdationRequired = DeleteLoadFolders @@ -1113,12 +1116,29 @@ object CarbonDataRDDFactory extends Logging { partitioner.partitionCount, isForceDeletion, details) if (isUpdationRequired) { + try { // Update load metadate file after cleaning deleted nodes - CarbonLoaderUtil.writeLoadMetadata( - carbonLoadModel.getCarbonDataLoadSchema, - carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, details.toList.asJava - ) + if (carbonTableStatusLock.lockWithRetries()) { + logger.info("Table status lock has been successfully acquired.") + CarbonLoaderUtil.writeLoadMetadata( + carbonLoadModel.getCarbonDataLoadSchema, + carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, details.toList.asJava + ) + } + else { + val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName + + "." + carbonLoadModel.getTableName + + ". Not able to acquire the table status lock due to other operation " + + "running in the background." + logger.audit(errorMsg) + logger.error(errorMsg) + throw new Exception(errorMsg + " Please try after some time.") + + } + } finally { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) + } } } } @@ -1140,25 +1160,32 @@ object CarbonDataRDDFactory extends Logging { val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName) val metaDataPath: String = table.getMetaDataFilepath - val carbonLock = CarbonLockFactory + val carbonCleanFilesLock = CarbonLockFactory .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, - LockUsage.METADATA_LOCK + LockUsage.CLEAN_FILES_LOCK ) try { - if (carbonLock.lockWithRetries()) { + if (carbonCleanFilesLock.lockWithRetries()) { + logger.info("Clean files lock has been successfully acquired.") deleteLoadsAndUpdateMetadata(carbonLoadModel, table, partitioner, hdfsStoreLocation, isForceDeletion = true) } + else { + val errorMsg = "Clean files request is failed for " + carbonLoadModel.getDatabaseName + + "." + carbonLoadModel.getTableName + + ". Not able to acquire the clean files lock due to another clean files " + + "operation is running in the background." + logger.audit(errorMsg) + logger.error(errorMsg) + throw new Exception(errorMsg + " Please try after some time.") + + } } finally { - if (carbonLock.unlock()) { - logInfo("unlock the table metadata file successfully") - } else { - logError("Unable to unlock the metadata lock") - } + CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 19519ea..5fb1b42 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -919,16 +919,20 @@ private[sql] case class DeleteLoadsById( val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) + try { + val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala - val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala - - if (invalidLoadIds.isEmpty) { + if (invalidLoadIds.isEmpty) { - LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.") - } - else { - sys.error("Delete segment by Id is failed. Invalid ID is :" - + invalidLoadIds.mkString(",")) + LOGGER.audit(s"Delete segment by Id is successfull for $databaseName.$tableName.") + } + else { + sys.error("Delete segment by Id is failed. Invalid ID is :" + + invalidLoadIds.mkString(",")) + } + } catch { + case ex: Exception => + sys.error(ex.getMessage) } Seq.empty @@ -983,13 +987,18 @@ private[sql] case class DeleteLoadsByLoadDate( } val path = carbonTable.getMetaDataFilepath() - val invalidLoadTimestamps = segmentStatusManager - .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala - if(invalidLoadTimestamps.isEmpty) { - LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.") - } - else { - sys.error("Delete segment by date is failed. No matching segment found.") + try { + val invalidLoadTimestamps = segmentStatusManager + .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala + if (invalidLoadTimestamps.isEmpty) { + LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.") + } + else { + sys.error("Delete segment by date is failed. No matching segment found.") + } + } catch { + case ex: Exception => + sys.error(ex.getMessage) } Seq.empty @@ -1549,12 +1558,17 @@ private[sql] case class CleanFiles( carbonLoadModel.setStorePath(relation.tableMeta.storePath) val dataLoadSchema = new CarbonDataLoadSchema(table) carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - CarbonDataRDDFactory.cleanFiles( - sqlContext.sparkContext, - carbonLoadModel, - relation.tableMeta.storePath, - relation.tableMeta.partitioner) - LOGGER.audit("Clean files request is successfull.") + try { + CarbonDataRDDFactory.cleanFiles( + sqlContext.sparkContext, + carbonLoadModel, + relation.tableMeta.storePath, + relation.tableMeta.partitioner) + LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.") + } catch { + case ex : Exception => + sys.error(ex.getMessage) + } Seq.empty } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala index 4517a60..2e7c757 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala @@ -28,11 +28,12 @@ import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.carbon.path.CarbonStorePath +import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.LoadMetadataDetails import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -47,15 +48,28 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { val resource = currentDirectory + "/src/test/resources/" val storeLocation = new File(this.getClass.getResource("/").getPath + "/../test").getCanonicalPath - val carbonTableIdentifier: CarbonTableIdentifier = - new CarbonTableIdentifier("default", "DataRetentionTable".toLowerCase(), "1") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)) - val carbontablePath = CarbonStorePath.getCarbonTablePath(storeLocation, carbonTableIdentifier) - .getMetadataDirectoryPath + val absoluteTableIdentifierForLock: AbsoluteTableIdentifier = new + AbsoluteTableIdentifier(storeLocation, + new CarbonTableIdentifier("default", "retentionlock", "200")) + val absoluteTableIdentifierForRetention: AbsoluteTableIdentifier = new + AbsoluteTableIdentifier(storeLocation, + new CarbonTableIdentifier("default", "DataRetentionTable".toLowerCase(), "300")) + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention) + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath, + absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath + var carbonDateFormat = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP) var defaultDateFormat = new SimpleDateFormat(CarbonCommonConstants .CARBON_TIMESTAMP_DEFAULT_FORMAT) + val carbonTableStatusLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK) + val carbonDeleteSegmentLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK) + val carbonCleanFilesLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK) + val carbonMetadataLock: ICarbonLock = CarbonLockFactory + .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, LockUsage.METADATA_LOCK) override def beforeAll { @@ -68,6 +82,16 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { "phonetype String, serialname String, salary int) stored by 'org.apache.carbondata.format'" ) + sql( + "CREATE table retentionlock (ID int, date String, country String, name " + + "String," + + "phonetype String, serialname String, salary int) stored by 'org.apache.carbondata.format'" + + ) + + sql( + "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE retentionlock " + + "OPTIONS('DELIMITER' = ',')") sql( "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE DataRetentionTable " + @@ -81,6 +105,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { override def afterAll { sql("drop table DataRetentionTable") sql("drop table carbon_TABLE_1") + sql("drop table retentionlock") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } @@ -99,7 +124,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { test("RetentionTest_withoutDelete") { checkAnswer( - sql("SELECT country, count(salary) AS amount FROM dataretentionTable WHERE country" + + sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" + " IN ('china','ind','aus','eng') GROUP BY country" ), Seq(Row("aus", 9), Row("ind", 9)) @@ -108,7 +133,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { test("RetentionTest_DeleteSegmentsByLoadTime") { val segments: Array[LoadMetadataDetails] = segmentStatusManager - .readLoadMetadata(carbontablePath) + .readLoadMetadata(carbonTablePath) // check segment length, it should be 3 (loads) if (segments.length != 2) { assert(false) @@ -117,7 +142,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { val actualValue: String = getSegmentStartTime(segments, 1) // delete segments (0,1) which contains ind, aus sql( - "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before '" + actualValue + "'") + "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before '" + actualValue + "'") // load segment 2 which contains eng sql( @@ -133,9 +158,9 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { test("RetentionTest3_DeleteByLoadId") { // delete segment 2 and load ind segment - sql("DELETE SEGMENT 2 FROM TABLE dataretentionTable") + sql("DELETE SEGMENT 2 FROM TABLE DataRetentionTable") sql( - "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE dataretentionTable " + + "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE DataRetentionTable " + "OPTIONS('DELIMITER' = ',')") checkAnswer( sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" + @@ -145,8 +170,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { ) // these queries should execute without any error. - sql("show segments for table dataretentionTable") - sql("clean files for table dataretentionTable") + sql("show segments for table DataRetentionTable") + sql("clean files for table DataRetentionTable") } test("RetentionTest4_DeleteByInvalidLoadId") { @@ -189,7 +214,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { try { sql( - "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" + + "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" + " 'abcd-01-01 00:00:00'") assert(false) } catch { @@ -200,7 +225,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { try { sql( - "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" + + "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" + " '2099:01:01 00:00:00'") assert(false) } catch { @@ -215,7 +240,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { ), Seq(Row("ind", 9)) ) - sql("DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before '2099-01-01'") + sql("DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before '2099-01-01'") checkAnswer( sql("SELECT country, count(salary) AS amount FROM DataRetentionTable WHERE country" + " IN ('china','ind','aus','eng') GROUP BY country"), Seq()) @@ -227,7 +252,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { // All these queries should fail. try { - sql("DELETE LOADS FROM TABLE dataretentionTable where STARTTIME before '2099-01-01'") + sql("DELETE LOADS FROM TABLE DataRetentionTable where STARTTIME before '2099-01-01'") throw new MalformedCarbonCommandException("Invalid query") } catch { case e: MalformedCarbonCommandException => @@ -236,7 +261,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { } try { - sql("DELETE LOAD 2 FROM TABLE dataretentionTable") + sql("DELETE LOAD 2 FROM TABLE DataRetentionTable") throw new MalformedCarbonCommandException("Invalid query") } catch { case e: MalformedCarbonCommandException => @@ -245,7 +270,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { } try { - sql("show loads for table dataretentionTable") + sql("show loads for table DataRetentionTable") throw new MalformedCarbonCommandException("Invalid query") } catch { case e: MalformedCarbonCommandException => @@ -254,4 +279,56 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { } } + + test("RetentionTest_Locks") { + + sql( + "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE retentionlock " + + "OPTIONS('DELIMITER' = ',')") + carbonDeleteSegmentLock.lockWithRetries() + carbonTableStatusLock.lockWithRetries() + carbonCleanFilesLock.lockWithRetries() + // delete segment 0 it should fail + try { + sql("DELETE SEGMENT 0 FROM TABLE retentionlock") + throw new MalformedCarbonCommandException("Invalid") + } catch { + case me: MalformedCarbonCommandException => + assert(false) + case ex: Exception => + assert(true) + } + + // it should fail + try { + sql("DELETE SEGMENTS FROM TABLE retentionlock where STARTTIME before " + + "'2099-01-01 00:00:00.0'") + throw new MalformedCarbonCommandException("Invalid") + } catch { + case me: MalformedCarbonCommandException => + assert(false) + case ex: Exception => + assert(true) + } + + // it should fail + try { + sql("clean files for table retentionlock") + throw new MalformedCarbonCommandException("Invalid") + } catch { + case me: MalformedCarbonCommandException => + assert(false) + case ex: Exception => + assert(true) + } + carbonTableStatusLock.unlock() + carbonCleanFilesLock.unlock() + carbonDeleteSegmentLock.unlock() + + sql("DELETE SEGMENT 0 FROM TABLE retentionlock") + //load and delete should execute parallely + carbonMetadataLock.lockWithRetries() + sql("DELETE SEGMENT 1 FROM TABLE retentionlock") + carbonMetadataLock.unlock() + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java new file mode 100644 index 0000000..31ac8e5 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.lcm.locks; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + +/** + * This class contains all carbon lock utilities + */ +public class CarbonLockUtil { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonLockUtil.class.getName()); + + /** + * unlocks given file + * + * @param carbonLock + */ + public static void fileUnlock(ICarbonLock carbonLock, String locktype) { + if (carbonLock.unlock()) { + if (locktype.equals(LockUsage.METADATA_LOCK)) { + LOGGER.info("Metadata lock has been successfully released"); + } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) { + LOGGER.info("Table status lock has been successfully released"); + } + else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) { + LOGGER.info("Clean files lock has been successfully released"); + } + else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) { + LOGGER.info("Delete segments lock has been successfully released"); + } + } else { + if (locktype.equals(LockUsage.METADATA_LOCK)) { + LOGGER.error("Not able to release the metadata lock"); + } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) { + LOGGER.error("Not able to release the table status lock"); + } + else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) { + LOGGER.info("Not able to release the clean files lock"); + } + else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) { + LOGGER.info("Not able to release the delete segments lock"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java index 4c604fd..b11951b 100644 --- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java +++ b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java @@ -23,10 +23,12 @@ package org.apache.carbondata.lcm.locks; * Each enum value is one specific lock case. */ public class LockUsage { - public static String LOCK = ".lock"; - public static String METADATA_LOCK = "meta.lock"; - public static String COMPACTION_LOCK = "compaction.lock"; - public static String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock"; - public static String TABLE_STATUS_LOCK = "tablestatus.lock"; + public static final String LOCK = ".lock"; + public static final String METADATA_LOCK = "meta.lock"; + public static final String COMPACTION_LOCK = "compaction.lock"; + public static final String SYSTEMLEVEL_COMPACTION_LOCK = "system_level_compaction.lock"; + public static final String TABLE_STATUS_LOCK = "tablestatus.lock"; + public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock"; + public static final String CLEAN_FILES_LOCK = "clean_files.lock"; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java index bc6032a..6db12b7 100644 --- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java +++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java @@ -35,6 +35,7 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -45,6 +46,7 @@ import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations; import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.lcm.fileoperations.FileWriteOperation; import org.apache.carbondata.lcm.locks.CarbonLockFactory; +import org.apache.carbondata.lcm.locks.CarbonLockUtil; import org.apache.carbondata.lcm.locks.ICarbonLock; import org.apache.carbondata.lcm.locks.LockUsage; @@ -248,14 +250,20 @@ public class SegmentStatusManager { * @param tableFolderPath * @return */ - public List<String> updateDeletionStatus(List<String> loadIds, String tableFolderPath) { - ICarbonLock carbonLock = CarbonLockFactory - .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(), - LockUsage.METADATA_LOCK); + public List<String> updateDeletionStatus(List<String> loadIds, String tableFolderPath) + throws Exception { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + ICarbonLock carbonDeleteSegmentLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK); + String tableDetails = + carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); List<String> invalidLoadIds = new ArrayList<String>(0); try { - if (carbonLock.lockWithRetries()) { - LOG.info("Metadata lock has been successfully acquired"); + if (carbonDeleteSegmentLock.lockWithRetries()) { + LOG.info("Delete segment lock has been successfully acquired"); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), @@ -272,8 +280,20 @@ public class SegmentStatusManager { if (listOfLoadFolderDetailsArray != null && listOfLoadFolderDetailsArray.length != 0) { updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, invalidLoadIds); if (invalidLoadIds.isEmpty()) { - // All or None , if anything fails then dont write - writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + if(carbonTableStatusLock.lockWithRetries()) { + LOG.info("Table status lock has been successfully acquired"); + // All or None , if anything fails then dont write + writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + } + else { + String errorMsg = "Delete segment by id is failed for " + tableDetails + + ". Not able to acquire the table status lock due to other operation running " + + "in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + } + } else { return invalidLoadIds; } @@ -284,12 +304,18 @@ public class SegmentStatusManager { } } else { - LOG.error("Unable to acquire the metadata lock"); + String errorMsg = "Delete segment by id is failed for " + tableDetails + + ". Not able to acquire the delete segment lock due to another delete " + + "operation is running in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); } } catch (IOException e) { LOG.error("IOException" + e.getMessage()); } finally { - fileUnlock(carbonLock); + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); } return invalidLoadIds; @@ -303,14 +329,19 @@ public class SegmentStatusManager { * @return */ public List<String> updateDeletionStatus(String loadDate, String tableFolderPath, - Long loadStartTime) { - ICarbonLock carbonLock = CarbonLockFactory - .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(), - LockUsage.METADATA_LOCK); + Long loadStartTime) throws Exception { + CarbonTableIdentifier carbonTableIdentifier = + absoluteTableIdentifier.getCarbonTableIdentifier(); + ICarbonLock carbonDeleteSegmentLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK); + String tableDetails = + carbonTableIdentifier.getDatabaseName() + "." + carbonTableIdentifier.getTableName(); List<String> invalidLoadTimestamps = new ArrayList<String>(0); try { - if (carbonLock.lockWithRetries()) { - LOG.info("Metadata lock has been successfully acquired"); + if (carbonDeleteSegmentLock.lockWithRetries()) { + LOG.info("Delete segment lock has been successfully acquired"); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), @@ -330,7 +361,20 @@ public class SegmentStatusManager { updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, invalidLoadTimestamps, loadStartTime); if (invalidLoadTimestamps.isEmpty()) { - writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + if(carbonTableStatusLock.lockWithRetries()) { + LOG.info("Table status lock has been successfully acquired."); + writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray); + } + else { + + String errorMsg = "Delete segment by date is failed for " + tableDetails + + ". Not able to acquire the table status lock due to other operation running " + + "in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); + + } } else { return invalidLoadTimestamps; } @@ -342,12 +386,18 @@ public class SegmentStatusManager { } } else { - LOG.error("Error message: " + "Unable to acquire the metadata lock"); + String errorMsg = "Delete segment by date is failed for " + tableDetails + + ". Not able to acquire the delete segment lock due to another delete " + + "operation is running in the background."; + LOG.audit(errorMsg); + LOG.error(errorMsg); + throw new Exception(errorMsg + " Please try after some time."); } } catch (IOException e) { LOG.error("Error message: " + "IOException" + e.getMessage()); } finally { - fileUnlock(carbonLock); + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); + CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK); } return invalidLoadTimestamps; @@ -383,9 +433,9 @@ public class SegmentStatusManager { brWriter.flush(); } CarbonUtil.closeStreams(brWriter); + fileWrite.close(); } - fileWrite.close(); } /** @@ -498,19 +548,6 @@ public class SegmentStatusManager { } } - /** - * unlocks given file - * - * @param carbonLock - */ - private void fileUnlock(ICarbonLock carbonLock) { - if (carbonLock.unlock()) { - LOG.info("Metadata lock has been successfully released"); - } else { - LOG.error("Not able to release the metadata lock"); - } - } - public static class ValidAndInvalidSegmentsInfo { private final List<String> listOfValidSegments; private final List<String> listOfValidUpdatedSegments;