Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 7147be281 -> e23222cf1


Added table Status lock while deleting the segments

Added locking while delete date, clean files

Corrected logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/08cb3f45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/08cb3f45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/08cb3f45

Branch: refs/heads/master
Commit: 08cb3f4502107f5724abb55a49ee10a38b1b4d1a
Parents: 7147be2
Author: Manohar <manohar.craz...@gmail.com>
Authored: Wed Sep 14 22:48:23 2016 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Sun Sep 18 02:50:43 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  57 ++++++---
 .../execution/command/carbonTableSchema.scala   |  56 +++++----
 .../dataretention/DataRetentionTestCase.scala   | 117 +++++++++++++++----
 .../carbondata/lcm/locks/CarbonLockUtil.java    |  64 ++++++++++
 .../apache/carbondata/lcm/locks/LockUsage.java  |  12 +-
 .../lcm/status/SegmentStatusManager.java        | 103 ++++++++++------
 7 files changed, 316 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index a8302c0..72078a6 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -473,9 +473,8 @@ public final class CarbonLoaderUtil {
 
       }
       CarbonUtil.closeStreams(brWriter);
-
+      writeOperation.close();
     }
-    writeOperation.close();
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1144c59..5a1d14f 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -44,7 +44,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, 
CompactionCallable, CompactionType}
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
@@ -52,7 +52,7 @@ import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, 
LoadMetadataUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user 
needs.
@@ -1106,6 +1106,9 @@ object CarbonDataRDDFactory extends Logging {
       val segmentStatusManager = new 
SegmentStatusManager(table.getAbsoluteTableIdentifier)
       val details = segmentStatusManager
         .readLoadMetadata(loadMetadataFilePath)
+      val carbonTableStatusLock = CarbonLockFactory
+        
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.TABLE_STATUS_LOCK)
 
       // Delete marked loads
       val isUpdationRequired = DeleteLoadFolders
@@ -1113,12 +1116,29 @@ object CarbonDataRDDFactory extends Logging {
           partitioner.partitionCount, isForceDeletion, details)
 
       if (isUpdationRequired) {
+        try {
         // Update load metadate file after cleaning deleted nodes
-        CarbonLoaderUtil.writeLoadMetadata(
-          carbonLoadModel.getCarbonDataLoadSchema,
-          carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, details.toList.asJava
-        )
+        if (carbonTableStatusLock.lockWithRetries()) {
+          logger.info("Table status lock has been successfully acquired.")
+          CarbonLoaderUtil.writeLoadMetadata(
+            carbonLoadModel.getCarbonDataLoadSchema,
+            carbonLoadModel.getDatabaseName,
+            carbonLoadModel.getTableName, details.toList.asJava
+          )
+        }
+        else {
+          val errorMsg = "Clean files request is failed for " + 
carbonLoadModel.getDatabaseName +
+                         "." + carbonLoadModel.getTableName +
+                         ". Not able to acquire the table status lock due to 
other operation " +
+                         "running in the background."
+          logger.audit(errorMsg)
+          logger.error(errorMsg)
+          throw new Exception(errorMsg + " Please try after some time.")
+
+        }
+      } finally {
+          CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK)
+        }
       }
     }
   }
@@ -1140,25 +1160,32 @@ object CarbonDataRDDFactory extends Logging {
     val table = 
org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
       .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + 
carbonLoadModel.getTableName)
     val metaDataPath: String = table.getMetaDataFilepath
-    val carbonLock = CarbonLockFactory
+    val carbonCleanFilesLock = CarbonLockFactory
       
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK
+        LockUsage.CLEAN_FILES_LOCK
       )
     try {
-      if (carbonLock.lockWithRetries()) {
+      if (carbonCleanFilesLock.lockWithRetries()) {
+        logger.info("Clean files lock has been successfully acquired.")
         deleteLoadsAndUpdateMetadata(carbonLoadModel,
           table,
           partitioner,
           hdfsStoreLocation,
           isForceDeletion = true)
       }
+      else {
+        val errorMsg = "Clean files request is failed for " + 
carbonLoadModel.getDatabaseName +
+                       "." + carbonLoadModel.getTableName +
+                       ". Not able to acquire the clean files lock due to 
another clean files " +
+                       "operation is running in the background."
+        logger.audit(errorMsg)
+        logger.error(errorMsg)
+        throw new Exception(errorMsg + " Please try after some time.")
+
+      }
     }
     finally {
-      if (carbonLock.unlock()) {
-        logInfo("unlock the table metadata file successfully")
-      } else {
-        logError("Unable to unlock the metadata lock")
-      }
+      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 19519ea..5fb1b42 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -919,16 +919,20 @@ private[sql] case class DeleteLoadsById(
 
     val segmentStatusManager =
       new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+    try {
+      val invalidLoadIds = 
segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
 
-    val invalidLoadIds = 
segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala
-
-    if (invalidLoadIds.isEmpty) {
+      if (invalidLoadIds.isEmpty) {
 
-      LOGGER.audit(s"Delete segment by Id is successfull for 
$databaseName.$tableName.")
-    }
-    else {
-      sys.error("Delete segment by Id is failed. Invalid ID is :"
-                + invalidLoadIds.mkString(","))
+        LOGGER.audit(s"Delete segment by Id is successfull for 
$databaseName.$tableName.")
+      }
+      else {
+        sys.error("Delete segment by Id is failed. Invalid ID is :"
+                  + invalidLoadIds.mkString(","))
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
     }
 
     Seq.empty
@@ -983,13 +987,18 @@ private[sql] case class DeleteLoadsByLoadDate(
     }
     val path = carbonTable.getMetaDataFilepath()
 
-    val invalidLoadTimestamps = segmentStatusManager
-      .updateDeletionStatus(loadDate, path, 
timeObj.asInstanceOf[java.lang.Long]).asScala
-    if(invalidLoadTimestamps.isEmpty) {
-      LOGGER.audit(s"Delete segment by date is successfull for 
$dbName.$tableName.")
-    }
-    else {
-      sys.error("Delete segment by date is failed. No matching segment found.")
+    try {
+      val invalidLoadTimestamps = segmentStatusManager
+        .updateDeletionStatus(loadDate, path, 
timeObj.asInstanceOf[java.lang.Long]).asScala
+      if (invalidLoadTimestamps.isEmpty) {
+        LOGGER.audit(s"Delete segment by date is successfull for 
$dbName.$tableName.")
+      }
+      else {
+        sys.error("Delete segment by date is failed. No matching segment 
found.")
+      }
+    } catch {
+      case ex: Exception =>
+        sys.error(ex.getMessage)
     }
     Seq.empty
 
@@ -1549,12 +1558,17 @@ private[sql] case class CleanFiles(
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    CarbonDataRDDFactory.cleanFiles(
-      sqlContext.sparkContext,
-      carbonLoadModel,
-      relation.tableMeta.storePath,
-      relation.tableMeta.partitioner)
-    LOGGER.audit("Clean files request is successfull.")
+    try {
+      CarbonDataRDDFactory.cleanFiles(
+        sqlContext.sparkContext,
+        carbonLoadModel,
+        relation.tableMeta.storePath,
+        relation.tableMeta.partitioner)
+      LOGGER.audit(s"Clean files request is successfull for 
$dbName.$tableName.")
+    } catch {
+      case ex : Exception =>
+        sys.error(ex.getMessage)
+    }
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 4517a60..2e7c757 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -28,11 +28,12 @@ import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.carbon.path.CarbonStorePath
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, 
CarbonTablePath}
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.LoadMetadataDetails
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -47,15 +48,28 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
   val resource = currentDirectory + "/src/test/resources/"
 
   val storeLocation = new File(this.getClass.getResource("/").getPath + 
"/../test").getCanonicalPath
-  val carbonTableIdentifier: CarbonTableIdentifier =
-    new CarbonTableIdentifier("default", "DataRetentionTable".toLowerCase(), 
"1")
-  val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-      AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier))
-  val carbontablePath = CarbonStorePath.getCarbonTablePath(storeLocation, 
carbonTableIdentifier)
-    .getMetadataDirectoryPath
+  val absoluteTableIdentifierForLock: AbsoluteTableIdentifier = new
+      AbsoluteTableIdentifier(storeLocation,
+        new CarbonTableIdentifier("default", "retentionlock", "200"))
+  val absoluteTableIdentifierForRetention: AbsoluteTableIdentifier = new
+      AbsoluteTableIdentifier(storeLocation,
+        new CarbonTableIdentifier("default", 
"DataRetentionTable".toLowerCase(), "300"))
+  val segmentStatusManager: SegmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifierForRetention)
+  val carbonTablePath = CarbonStorePath
+    .getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath,
+      
absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath
+
   var carbonDateFormat = new 
SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
   var defaultDateFormat = new SimpleDateFormat(CarbonCommonConstants
     .CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  val carbonTableStatusLock: ICarbonLock = CarbonLockFactory
+    .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK)
+  val carbonDeleteSegmentLock: ICarbonLock = CarbonLockFactory
+    .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, 
LockUsage.DELETE_SEGMENT_LOCK)
+  val carbonCleanFilesLock: ICarbonLock = CarbonLockFactory
+    .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, 
LockUsage.CLEAN_FILES_LOCK)
+  val carbonMetadataLock: ICarbonLock = CarbonLockFactory
+    .getCarbonLockObj(absoluteTableIdentifierForLock.getCarbonTableIdentifier, 
LockUsage.METADATA_LOCK)
 
 
   override def beforeAll {
@@ -68,6 +82,16 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
       "phonetype String, serialname String, salary int) stored by 
'org.apache.carbondata.format'"
 
     )
+    sql(
+      "CREATE table retentionlock (ID int, date String, country String, name " 
+
+      "String," +
+      "phonetype String, serialname String, salary int) stored by 
'org.apache.carbondata.format'"
+
+    )
+
+    sql(
+      "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE 
retentionlock " +
+      "OPTIONS('DELIMITER' =  ',')")
 
     sql(
       "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE 
DataRetentionTable " +
@@ -81,6 +105,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
   override def afterAll {
     sql("drop table DataRetentionTable")
     sql("drop table carbon_TABLE_1")
+    sql("drop table retentionlock")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
   }
@@ -99,7 +124,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   test("RetentionTest_withoutDelete") {
     checkAnswer(
-      sql("SELECT country, count(salary) AS amount FROM dataretentionTable 
WHERE country" +
+      sql("SELECT country, count(salary) AS amount FROM DataRetentionTable 
WHERE country" +
           " IN ('china','ind','aus','eng') GROUP BY country"
       ),
       Seq(Row("aus", 9), Row("ind", 9))
@@ -108,7 +133,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   test("RetentionTest_DeleteSegmentsByLoadTime") {
     val segments: Array[LoadMetadataDetails] = segmentStatusManager
-      .readLoadMetadata(carbontablePath)
+      .readLoadMetadata(carbonTablePath)
     // check segment length, it should be 3 (loads)
     if (segments.length != 2) {
       assert(false)
@@ -117,7 +142,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
     val actualValue: String = getSegmentStartTime(segments, 1)
     // delete segments (0,1) which contains ind, aus
     sql(
-      "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before '" 
+ actualValue + "'")
+      "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before '" 
+ actualValue + "'")
 
     // load segment 2 which contains eng
     sql(
@@ -133,9 +158,9 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
   test("RetentionTest3_DeleteByLoadId") {
     // delete segment 2 and load ind segment
-    sql("DELETE SEGMENT 2 FROM TABLE dataretentionTable")
+    sql("DELETE SEGMENT 2 FROM TABLE DataRetentionTable")
     sql(
-      "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE 
dataretentionTable " +
+      "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE 
DataRetentionTable " +
       "OPTIONS('DELIMITER' = ',')")
     checkAnswer(
       sql("SELECT country, count(salary) AS amount FROM DataRetentionTable 
WHERE country" +
@@ -145,8 +170,8 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
     )
 
     // these queries should execute without any error.
-    sql("show segments for table dataretentionTable")
-    sql("clean files for table dataretentionTable")
+    sql("show segments for table DataRetentionTable")
+    sql("clean files for table DataRetentionTable")
   }
 
   test("RetentionTest4_DeleteByInvalidLoadId") {
@@ -189,7 +214,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     try {
       sql(
-        "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" 
+
+        "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" 
+
         " 'abcd-01-01 00:00:00'")
       assert(false)
     } catch {
@@ -200,7 +225,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     try {
       sql(
-        "DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before" 
+
+        "DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before" 
+
         " '2099:01:01 00:00:00'")
       assert(false)
     } catch {
@@ -215,7 +240,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
       ),
       Seq(Row("ind", 9))
     )
-    sql("DELETE SEGMENTS FROM TABLE dataretentionTable where STARTTIME before 
'2099-01-01'")
+    sql("DELETE SEGMENTS FROM TABLE DataRetentionTable where STARTTIME before 
'2099-01-01'")
     checkAnswer(
       sql("SELECT country, count(salary) AS amount FROM DataRetentionTable 
WHERE country" +
           " IN ('china','ind','aus','eng') GROUP BY country"), Seq())
@@ -227,7 +252,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
 
     // All these queries should fail.
     try {
-      sql("DELETE LOADS FROM TABLE dataretentionTable where STARTTIME before 
'2099-01-01'")
+      sql("DELETE LOADS FROM TABLE DataRetentionTable where STARTTIME before 
'2099-01-01'")
       throw new MalformedCarbonCommandException("Invalid query")
     } catch {
       case e: MalformedCarbonCommandException =>
@@ -236,7 +261,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
     }
 
     try {
-      sql("DELETE LOAD 2 FROM TABLE dataretentionTable")
+      sql("DELETE LOAD 2 FROM TABLE DataRetentionTable")
       throw new MalformedCarbonCommandException("Invalid query")
     } catch {
       case e: MalformedCarbonCommandException =>
@@ -245,7 +270,7 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
     }
 
     try {
-      sql("show loads for table dataretentionTable")
+      sql("show loads for table DataRetentionTable")
       throw new MalformedCarbonCommandException("Invalid query")
     } catch {
       case e: MalformedCarbonCommandException =>
@@ -254,4 +279,56 @@ class DataRetentionTestCase extends QueryTest with 
BeforeAndAfterAll {
     }
 
   }
+
+  test("RetentionTest_Locks") {
+
+    sql(
+      "LOAD DATA LOCAL INPATH '" + resource + "dataretention1.csv' INTO TABLE 
retentionlock " +
+      "OPTIONS('DELIMITER' = ',')")
+    carbonDeleteSegmentLock.lockWithRetries()
+    carbonTableStatusLock.lockWithRetries()
+    carbonCleanFilesLock.lockWithRetries()
+    // delete segment 0 it should fail
+    try {
+      sql("DELETE SEGMENT 0 FROM TABLE retentionlock")
+      throw new MalformedCarbonCommandException("Invalid")
+    } catch {
+      case me: MalformedCarbonCommandException =>
+        assert(false)
+      case ex: Exception =>
+        assert(true)
+    }
+
+    // it should fail
+    try {
+      sql("DELETE SEGMENTS FROM TABLE retentionlock where STARTTIME before " +
+          "'2099-01-01 00:00:00.0'")
+      throw new MalformedCarbonCommandException("Invalid")
+    } catch {
+      case me: MalformedCarbonCommandException =>
+        assert(false)
+      case ex: Exception =>
+        assert(true)
+    }
+
+    // it should fail
+    try {
+      sql("clean files for table retentionlock")
+      throw new MalformedCarbonCommandException("Invalid")
+    } catch {
+      case me: MalformedCarbonCommandException =>
+        assert(false)
+      case ex: Exception =>
+        assert(true)
+    }
+    carbonTableStatusLock.unlock()
+    carbonCleanFilesLock.unlock()
+    carbonDeleteSegmentLock.unlock()
+
+    sql("DELETE SEGMENT 0 FROM TABLE retentionlock")
+    //load and delete should execute parallely
+    carbonMetadataLock.lockWithRetries()
+    sql("DELETE SEGMENT 1 FROM TABLE retentionlock")
+    carbonMetadataLock.unlock()
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java 
b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
new file mode 100644
index 0000000..31ac8e5
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/lcm/locks/CarbonLockUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.lcm.locks;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * This class contains all carbon lock utilities
+ */
+public class CarbonLockUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonLockUtil.class.getName());
+
+  /**
+   * unlocks given file
+   *
+   * @param carbonLock
+   */
+  public static void fileUnlock(ICarbonLock carbonLock, String locktype) {
+    if (carbonLock.unlock()) {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.info("Metadata lock has been successfully released");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.info("Table status lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Clean files lock has been successfully released");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Delete segments lock has been successfully released");
+      }
+    } else {
+      if (locktype.equals(LockUsage.METADATA_LOCK)) {
+        LOGGER.error("Not able to release the metadata lock");
+      } else if (locktype.equals(LockUsage.TABLE_STATUS_LOCK)) {
+        LOGGER.error("Not able to release the table status lock");
+      }
+      else if (locktype.equals(LockUsage.CLEAN_FILES_LOCK)) {
+        LOGGER.info("Not able to release the clean files lock");
+      }
+      else if (locktype.equals(LockUsage.DELETE_SEGMENT_LOCK)) {
+        LOGGER.info("Not able to release the delete segments lock");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java 
b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
index 4c604fd..b11951b 100644
--- a/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
+++ b/processing/src/main/java/org/apache/carbondata/lcm/locks/LockUsage.java
@@ -23,10 +23,12 @@ package org.apache.carbondata.lcm.locks;
  * Each enum value is one specific lock case.
  */
 public class LockUsage {
-  public static String LOCK = ".lock";
-  public static String METADATA_LOCK = "meta.lock";
-  public static String COMPACTION_LOCK = "compaction.lock";
-  public static String SYSTEMLEVEL_COMPACTION_LOCK = 
"system_level_compaction.lock";
-  public static String TABLE_STATUS_LOCK = "tablestatus.lock";
+  public static final String LOCK = ".lock";
+  public static final String METADATA_LOCK = "meta.lock";
+  public static final String COMPACTION_LOCK = "compaction.lock";
+  public static final String SYSTEMLEVEL_COMPACTION_LOCK = 
"system_level_compaction.lock";
+  public static final String TABLE_STATUS_LOCK = "tablestatus.lock";
+  public static final String DELETE_SEGMENT_LOCK = "delete_segment.lock";
+  public static final String CLEAN_FILES_LOCK = "clean_files.lock";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/08cb3f45/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
 
b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
index bc6032a..6db12b7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
+++ 
b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java
@@ -35,6 +35,7 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.path.CarbonStorePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -45,6 +46,7 @@ import 
org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
 import org.apache.carbondata.lcm.locks.CarbonLockFactory;
+import org.apache.carbondata.lcm.locks.CarbonLockUtil;
 import org.apache.carbondata.lcm.locks.ICarbonLock;
 import org.apache.carbondata.lcm.locks.LockUsage;
 
@@ -248,14 +250,20 @@ public class SegmentStatusManager {
    * @param tableFolderPath
    * @return
    */
-  public List<String> updateDeletionStatus(List<String> loadIds, String 
tableFolderPath) {
-    ICarbonLock carbonLock = CarbonLockFactory
-        .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
-            LockUsage.METADATA_LOCK);
+  public List<String> updateDeletionStatus(List<String> loadIds, String 
tableFolderPath)
+      throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + 
carbonTableIdentifier.getTableName();
     List<String> invalidLoadIds = new ArrayList<String>(0);
     try {
-      if (carbonLock.lockWithRetries()) {
-        LOG.info("Metadata lock has been successfully acquired");
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
 
         CarbonTablePath carbonTablePath = CarbonStorePath
             .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
@@ -272,8 +280,20 @@ public class SegmentStatusManager {
         if (listOfLoadFolderDetailsArray != null && 
listOfLoadFolderDetailsArray.length != 0) {
           updateDeletionStatus(loadIds, listOfLoadFolderDetailsArray, 
invalidLoadIds);
           if (invalidLoadIds.isEmpty()) {
-            // All or None , if anything fails then dont write
-            writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired");
+              // All or None , if anything fails then dont write
+              writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            }
+            else {
+              String errorMsg = "Delete segment by id is failed for " + 
tableDetails
+                  + ". Not able to acquire the table status lock due to other 
operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+            }
+
           } else {
             return invalidLoadIds;
           }
@@ -284,12 +304,18 @@ public class SegmentStatusManager {
         }
 
       } else {
-        LOG.error("Unable to acquire the metadata lock");
+        String errorMsg = "Delete segment by id is failed for " + tableDetails
+            + ". Not able to acquire the delete segment lock due to another 
delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
       }
     } catch (IOException e) {
       LOG.error("IOException" + e.getMessage());
     } finally {
-      fileUnlock(carbonLock);
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, 
LockUsage.DELETE_SEGMENT_LOCK);
     }
 
     return invalidLoadIds;
@@ -303,14 +329,19 @@ public class SegmentStatusManager {
    * @return
    */
   public List<String> updateDeletionStatus(String loadDate, String 
tableFolderPath,
-      Long loadStartTime) {
-    ICarbonLock carbonLock = CarbonLockFactory
-        .getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(),
-            LockUsage.METADATA_LOCK);
+      Long loadStartTime) throws Exception {
+    CarbonTableIdentifier carbonTableIdentifier =
+        absoluteTableIdentifier.getCarbonTableIdentifier();
+    ICarbonLock carbonDeleteSegmentLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.DELETE_SEGMENT_LOCK);
+    ICarbonLock carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, 
LockUsage.TABLE_STATUS_LOCK);
+    String tableDetails =
+        carbonTableIdentifier.getDatabaseName() + "." + 
carbonTableIdentifier.getTableName();
     List<String> invalidLoadTimestamps = new ArrayList<String>(0);
     try {
-      if (carbonLock.lockWithRetries()) {
-        LOG.info("Metadata lock has been successfully acquired");
+      if (carbonDeleteSegmentLock.lockWithRetries()) {
+        LOG.info("Delete segment lock has been successfully acquired");
 
         CarbonTablePath carbonTablePath = CarbonStorePath
             .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
@@ -330,7 +361,20 @@ public class SegmentStatusManager {
           updateDeletionStatus(loadDate, listOfLoadFolderDetailsArray, 
invalidLoadTimestamps,
               loadStartTime);
           if (invalidLoadTimestamps.isEmpty()) {
-            writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            if(carbonTableStatusLock.lockWithRetries()) {
+              LOG.info("Table status lock has been successfully acquired.");
+              writeLoadDetailsIntoFile(dataLoadLocation, 
listOfLoadFolderDetailsArray);
+            }
+            else {
+
+              String errorMsg = "Delete segment by date is failed for " + 
tableDetails
+                  + ". Not able to acquire the table status lock due to other 
operation running "
+                  + "in the background.";
+              LOG.audit(errorMsg);
+              LOG.error(errorMsg);
+              throw new Exception(errorMsg + " Please try after some time.");
+
+            }
           } else {
             return invalidLoadTimestamps;
           }
@@ -342,12 +386,18 @@ public class SegmentStatusManager {
         }
 
       } else {
-        LOG.error("Error message: " + "Unable to acquire the metadata lock");
+        String errorMsg = "Delete segment by date is failed for " + 
tableDetails
+            + ". Not able to acquire the delete segment lock due to another 
delete "
+            + "operation is running in the background.";
+        LOG.audit(errorMsg);
+        LOG.error(errorMsg);
+        throw new Exception(errorMsg + " Please try after some time.");
       }
     } catch (IOException e) {
       LOG.error("Error message: " + "IOException" + e.getMessage());
     } finally {
-      fileUnlock(carbonLock);
+      CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK);
+      CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, 
LockUsage.DELETE_SEGMENT_LOCK);
     }
 
     return invalidLoadTimestamps;
@@ -383,9 +433,9 @@ public class SegmentStatusManager {
         brWriter.flush();
       }
       CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
     }
 
-    fileWrite.close();
   }
 
   /**
@@ -498,19 +548,6 @@ public class SegmentStatusManager {
     }
   }
 
-  /**
-   * unlocks given file
-   *
-   * @param carbonLock
-   */
-  private void fileUnlock(ICarbonLock carbonLock) {
-    if (carbonLock.unlock()) {
-      LOG.info("Metadata lock has been successfully released");
-    } else {
-      LOG.error("Not able to release the metadata lock");
-    }
-  }
-
   public static class ValidAndInvalidSegmentsInfo {
     private final List<String> listOfValidSegments;
     private final List<String> listOfValidUpdatedSegments;

Reply via email to