[CARBONDATA-2717] fixed table id empty problem while taking drop lock

This closes #2472


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/637a9746
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/637a9746
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/637a9746

Branch: refs/heads/carbonstore
Commit: 637a97469c1917a8554606eba138a7bb3fdeaa9c
Parents: 98c7581
Author: kunal642 <kunalkapoor...@gmail.com>
Authored: Tue Jul 10 14:38:05 2018 +0530
Committer: Venkata Ramana G <ramana.gollam...@huawei.com>
Committed: Fri Jul 13 17:11:27 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/CarbonLockFactory.java       |  4 ----
 .../apache/carbondata/core/locks/CarbonLockUtil.java   | 13 +++++++++++--
 .../carbondata/hadoop/api/CarbonFileInputFormat.java   |  4 +---
 .../main/scala/org/apache/spark/sql/CarbonEnv.scala    |  2 +-
 .../command/schema/CarbonGetTableDetailCommand.scala   | 12 ++++--------
 .../command/table/CarbonCreateTableCommand.scala       |  3 ++-
 .../command/table/CarbonDropTableCommand.scala         |  8 ++++----
 .../apache/spark/sql/hive/CarbonFileMetastore.scala    |  3 ++-
 .../org/apache/spark/sql/hive/CarbonSessionState.scala |  3 ++-
 9 files changed, 27 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java 
b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index 769e752..91677a6 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -60,10 +60,6 @@ public class CarbonLockFactory {
     if (lockPath.isEmpty()) {
       absoluteLockPath = absoluteTableIdentifier.getTablePath();
     } else {
-      if (absoluteTableIdentifier
-          .getCarbonTableIdentifier().getTableId().isEmpty()) {
-        throw new RuntimeException("Table id is empty");
-      }
       absoluteLockPath =
           
getLockpath(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java 
b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 4d67faf..ca6cddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -27,6 +28,8 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * This class contains all carbon lock utilities
  */
@@ -121,8 +124,14 @@ public class CarbonLockUtil {
     final long segmentLockFilesPreservTime =
         CarbonProperties.getInstance().getSegmentLockFilesPreserveHours();
     AbsoluteTableIdentifier absoluteTableIdentifier = 
carbonTable.getAbsoluteTableIdentifier();
-    String lockFilesDir = CarbonTablePath
-        .getLockFilesDirPath(absoluteTableIdentifier.getTablePath());
+    String lockFilesDir = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.LOCK_PATH, "");
+    if (StringUtils.isEmpty(lockFilesDir)) {
+      lockFilesDir = 
CarbonTablePath.getLockFilesDirPath(absoluteTableIdentifier.getTablePath());
+    } else {
+      lockFilesDir = CarbonTablePath.getLockFilesDirPath(
+          
CarbonLockFactory.getLockpath(carbonTable.getTableInfo().getFactTable().getTableId()));
+    }
     CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir)
         .listFiles(new CarbonFileFilter() {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 8755176..0f02e12 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -100,13 +100,11 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
    */
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
-
-    AbsoluteTableIdentifier identifier = 
getAbsoluteTableIdentifier(job.getConfiguration());
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
-
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
+    AbsoluteTableIdentifier identifier = 
carbonTable.getAbsoluteTableIdentifier();
 
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       // get all valid segments and set them into the configuration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5650187..70c4f12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -226,7 +226,7 @@ object CarbonEnv {
       DataMapStoreManager.getInstance().
         clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
           
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
-          identifier.table))
+          identifier.table, table.get.getTableInfo.getFactTable.getTableId))
       isRefreshed = true
     }
     isRefreshed

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
index 2c702b6..8d6a4cc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
@@ -35,20 +35,16 @@ case class CarbonGetTableDetailCommand(
   extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val storePath = CarbonProperties.getStorePath
     if (tableNames.isDefined) {
       tableNames.get.map { tablename =>
-        val absoluteTableIdentifier =
-          AbsoluteTableIdentifier.from(storePath, databaseName.toLowerCase, 
tablename.toLowerCase)
-        val carbonTableIdentifier = 
absoluteTableIdentifier.getCarbonTableIdentifier
-        val carbonTable = 
CarbonEnv.getCarbonTable(Option(carbonTableIdentifier.getDatabaseName),
-          carbonTableIdentifier.getTableName)(sparkSession)
+        val carbonTable = CarbonEnv.getCarbonTable(Option(databaseName),
+          tablename)(sparkSession)
 
         Row(
           tablename,
           carbonTable.size,
-          
SegmentStatusManager.getTableStatusLastModifiedTime(absoluteTableIdentifier)
-        )
+          SegmentStatusManager
+            
.getTableStatusLastModifiedTime(carbonTable.getAbsoluteTableIdentifier))
       }
     } else {
       Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 543ba39..c403d52 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -81,7 +81,8 @@ case class CarbonCreateTableCommand(
         throw new UnsupportedOperationException("streaming is not supported 
with s3 store")
       }
       tableInfo.setTablePath(tablePath)
-      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, 
tableName)
+      val tableIdentifier = AbsoluteTableIdentifier
+        .from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId)
 
       // Add validation for sort scope when create table
       val sortScope = tableInfo.getFactTable.getTableProperties.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 5d74c2c..e4b298f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -53,9 +53,7 @@ case class CarbonDropTableCommand(
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-
-    val identifier = CarbonEnv.getIdentifier(databaseNameOp, 
tableName)(sparkSession)
-    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
+    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = 
ListBuffer()
     try {
       carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
@@ -64,8 +62,10 @@ case class CarbonDropTableCommand(
       } else {
         List.empty
       }
+      val identifier = carbonTable.getAbsoluteTableIdentifier
       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
+        lock => carbonLocks +=
+                CarbonLockUtil.getLockObject(identifier, lock)
       }
 
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 1670d8a..dddc72c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -355,7 +355,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tableName = tableInfo.getFactTable.getTableName
     val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo(
       tableInfo, dbName, tableName)
-    val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    val identifier = AbsoluteTableIdentifier
+      .from(tablePath, dbName, tableName, 
thriftTableInfo.getFact_table.getTable_id)
     createSchemaThriftFile(identifier, thriftTableInfo)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index b361e36..2c98ec2 100644
--- 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -176,7 +176,8 @@ class CarbonHiveSessionCatalog(
       refreshTable(identifier)
       DataMapStoreManager.getInstance().
         clearDataMaps(AbsoluteTableIdentifier.from(storePath,
-          identifier.database.getOrElse("default"), identifier.table))
+          identifier.database.getOrElse("default"),
+          identifier.table))
       isRefreshed = true
       logInfo(s"Schema changes have been detected for table: $identifier")
     }

Reply via email to