[CARBONDATA-2717] fixed table id empty problem while taking drop lock This closes #2472
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/637a9746 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/637a9746 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/637a9746 Branch: refs/heads/carbonstore Commit: 637a97469c1917a8554606eba138a7bb3fdeaa9c Parents: 98c7581 Author: kunal642 <kunalkapoor...@gmail.com> Authored: Tue Jul 10 14:38:05 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Fri Jul 13 17:11:27 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/locks/CarbonLockFactory.java | 4 ---- .../apache/carbondata/core/locks/CarbonLockUtil.java | 13 +++++++++++-- .../carbondata/hadoop/api/CarbonFileInputFormat.java | 4 +--- .../main/scala/org/apache/spark/sql/CarbonEnv.scala | 2 +- .../command/schema/CarbonGetTableDetailCommand.scala | 12 ++++-------- .../command/table/CarbonCreateTableCommand.scala | 3 ++- .../command/table/CarbonDropTableCommand.scala | 8 ++++---- .../apache/spark/sql/hive/CarbonFileMetastore.scala | 3 ++- .../org/apache/spark/sql/hive/CarbonSessionState.scala | 3 ++- 9 files changed, 27 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java index 769e752..91677a6 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java @@ -60,10 +60,6 @@ public class CarbonLockFactory { if (lockPath.isEmpty()) { absoluteLockPath = absoluteTableIdentifier.getTablePath(); } else { - if (absoluteTableIdentifier - .getCarbonTableIdentifier().getTableId().isEmpty()) { - throw new RuntimeException("Table id is empty"); - } absoluteLockPath = getLockpath(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java index 4d67faf..ca6cddb 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -27,6 +28,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.commons.lang.StringUtils; + /** * This class contains all carbon lock utilities */ @@ -121,8 +124,14 @@ public class CarbonLockUtil { final long segmentLockFilesPreservTime = CarbonProperties.getInstance().getSegmentLockFilesPreserveHours(); AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier(); - String lockFilesDir = CarbonTablePath - .getLockFilesDirPath(absoluteTableIdentifier.getTablePath()); + String lockFilesDir = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.LOCK_PATH, ""); + if (StringUtils.isEmpty(lockFilesDir)) { + lockFilesDir = CarbonTablePath.getLockFilesDirPath(absoluteTableIdentifier.getTablePath()); + } else { + lockFilesDir = CarbonTablePath.getLockFilesDirPath( + CarbonLockFactory.getLockpath(carbonTable.getTableInfo().getFactTable().getTableId())); + } CarbonFile[] files = FileFactory.getCarbonFile(lockFilesDir) .listFiles(new CarbonFileFilter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 8755176..0f02e12 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -100,13 +100,11 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se */ @Override public List<InputSplit> getSplits(JobContext job) throws IOException { - - AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); - if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); } + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); if (getValidateSegmentsToAccess(job.getConfiguration())) { // get all valid segments and set them into the configuration http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 5650187..70c4f12 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -226,7 +226,7 @@ object CarbonEnv { DataMapStoreManager.getInstance(). clearDataMaps(AbsoluteTableIdentifier.from(tablePath, identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase), - identifier.table)) + identifier.table, table.get.getTableInfo.getFactTable.getTableId)) isRefreshed = true } isRefreshed http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala index 2c702b6..8d6a4cc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala @@ -35,20 +35,16 @@ case class CarbonGetTableDetailCommand( extends DataCommand { override def processData(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonProperties.getStorePath if (tableNames.isDefined) { tableNames.get.map { tablename => - val absoluteTableIdentifier = - AbsoluteTableIdentifier.from(storePath, databaseName.toLowerCase, tablename.toLowerCase) - val carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier - val carbonTable = CarbonEnv.getCarbonTable(Option(carbonTableIdentifier.getDatabaseName), - carbonTableIdentifier.getTableName)(sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(Option(databaseName), + tablename)(sparkSession) Row( tablename, carbonTable.size, - SegmentStatusManager.getTableStatusLastModifiedTime(absoluteTableIdentifier) - ) + SegmentStatusManager + .getTableStatusLastModifiedTime(carbonTable.getAbsoluteTableIdentifier)) } } else { Seq.empty[Row] http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 543ba39..c403d52 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -81,7 +81,8 @@ case class CarbonCreateTableCommand( throw new UnsupportedOperationException("streaming is not supported with s3 store") } tableInfo.setTablePath(tablePath) - val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) + val tableIdentifier = AbsoluteTableIdentifier + .from(tablePath, dbName, tableName, tableInfo.getFactTable.getTableId) // Add validation for sort scope when create table val sortScope = tableInfo.getFactTable.getTableProperties.asScala http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 5d74c2c..e4b298f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -53,9 +53,7 @@ case class CarbonDropTableCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession) - val dbName = identifier.getCarbonTableIdentifier.getDatabaseName + val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) @@ -64,8 +62,10 @@ case class CarbonDropTableCommand( } else { List.empty } + val identifier = carbonTable.getAbsoluteTableIdentifier locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock) + lock => carbonLocks += + CarbonLockUtil.getLockObject(identifier, lock) } if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 1670d8a..dddc72c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -355,7 +355,8 @@ class CarbonFileMetastore extends CarbonMetaStore { val tableName = tableInfo.getFactTable.getTableName val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo( tableInfo, dbName, tableName) - val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName) + val identifier = AbsoluteTableIdentifier + .from(tablePath, dbName, tableName, thriftTableInfo.getFact_table.getTable_id) createSchemaThriftFile(identifier, thriftTableInfo) LOGGER.info(s"Table $tableName for Database $dbName created successfully.") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/637a9746/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index b361e36..2c98ec2 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -176,7 +176,8 @@ class CarbonHiveSessionCatalog( refreshTable(identifier) DataMapStoreManager.getInstance(). clearDataMaps(AbsoluteTableIdentifier.from(storePath, - identifier.database.getOrElse("default"), identifier.table)) + identifier.database.getOrElse("default"), + identifier.table)) isRefreshed = true logInfo(s"Schema changes have been detected for table: $identifier") }