Repository: carbondata Updated Branches: refs/heads/master c723947a7 -> 05086e536
http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala index 443ff70..21aba7d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -17,21 +17,24 @@ package org.apache.spark.sql.execution.command.datamap +import java.util + import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.types.StringType +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + /** * Show the datamaps on the table - * @param databaseNameOp - * @param tableName + * @param tableIdentifier */ -case class CarbonDataMapShowCommand( - databaseNameOp: Option[String], - tableName: String) +case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) extends DataCommand { override def output: Seq[Attribute] = { @@ -41,9 +44,23 @@ case class CarbonDataMapShowCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - val schemaList = carbonTable.getTableInfo.getDataMapSchemaList + tableIdentifier match { + case Some(table) => + Checker.validateTableExists(table.database, table.table, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession) + if (carbonTable.hasDataMapSchema) { + val schemaList = carbonTable.getTableInfo.getDataMapSchemaList + convertToRow(schemaList) + } else { + convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)) + } + case _ => + convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas) + } + + } + + private def convertToRow(schemaList: util.List[DataMapSchema]) = { if (schemaList != null && schemaList.size() > 0) { schemaList.asScala.map { s => var table = "(NA)" http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index f773a55..fde3fc0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -29,121 +29,161 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} -import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider} +import org.apache.carbondata.datamap.DataMapManager import org.apache.carbondata.events._ /** * Drops the datamap and any related tables associated with the datamap * @param dataMapName * @param ifExistsSet - * @param databaseNameOp - * @param tableName + * @param table */ case class CarbonDropDataMapCommand( dataMapName: String, ifExistsSet: Boolean, - databaseNameOp: Option[String], - tableName: String, + table: Option[TableIdentifier], forceDrop: Boolean = false) extends AtomicRunnableCommand { private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) private var dataMapProvider: DataMapProvider = _ - private var mainTable: CarbonTable = _ - private var dataMapSchema: DataMapSchema = _ + var mainTable: CarbonTable = _ + var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) - val locksToBeAcquired = List(LockUsage.METADATA_LOCK) - val carbonEnv = CarbonEnv.getInstance(sparkSession) - val catalog = carbonEnv.carbonMetastore - val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) - val tableIdentifier = - AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase) - catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() - try { - locksToBeAcquired foreach { - lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) + if (table.isDefined) { + val databaseNameOp = table.get.database + val tableName = table.get.table + val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK) + val carbonEnv = CarbonEnv.getInstance(sparkSession) + val catalog = carbonEnv.carbonMetastore + val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession) + val tableIdentifier = + AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase) + catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) + if (mainTable == null) { + mainTable = try { + CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + } catch { + case ex: NoSuchTableException => + throwMetadataException(dbName, tableName, + s"Dropping datamap $dataMapName failed: ${ ex.getMessage }") + null + } } - LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]") - val carbonTable: Option[CarbonTable] = try { - Some(CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)) - } catch { - case ex: NoSuchTableException => - throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex) + if (forceDrop && mainTable != null && dataMapSchema != null) { + if (dataMapSchema != null) { + dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession) + } + // TODO do a check for existance before dropping + dataMapProvider.freeMeta(mainTable, dataMapSchema) + return Seq.empty } - // If datamap to be dropped in parent table then drop the datamap from metastore and remove - // entry from parent table. - // If force drop is true then remove the datamap from hivemetastore. No need to remove from - // parent as the first condition would have taken care of it. - if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { - mainTable = carbonTable.get - val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. - find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) - if (dataMapSchemaOp.isDefined) { - dataMapSchema = dataMapSchemaOp.get._1 - val operationContext = new OperationContext - val dropDataMapPreEvent = - DropDataMapPreEvent( - Some(dataMapSchema), - ifExistsSet, - sparkSession) - OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext) - mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - PreAggregateUtil.updateSchemaInfo( - mainTable, - schemaConverter.fromWrapperToExternalTableInfo( - mainTable.getTableInfo, - dbName, - tableName))(sparkSession) - dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema) - dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession) + val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() + try { + locksToBeAcquired foreach { + lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) + } + LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]") + // If datamap to be dropped in parent table then drop the datamap from metastore and remove + // entry from parent table. + // If force drop is true then remove the datamap from hivemetastore. No need to remove from + // parent as the first condition would have taken care of it. + if (mainTable != null && mainTable.getTableInfo.getDataMapSchemaList.size() > 0) { + val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. + find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) + if (dataMapSchemaOp.isDefined) { + dataMapSchema = dataMapSchemaOp.get._1 + val operationContext = new OperationContext + val dropDataMapPreEvent = + DropDataMapPreEvent( + Some(dataMapSchema), + ifExistsSet, + sparkSession) + OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext) + mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + PreAggregateUtil.updateSchemaInfo( + mainTable, + schemaConverter.fromWrapperToExternalTableInfo( + mainTable.getTableInfo, + dbName, + tableName))(sparkSession) + if (dataMapProvider == null) { + dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession) + } + dataMapProvider.freeMeta(mainTable, dataMapSchema) - // fires the event after dropping datamap from main table schema - val dropDataMapPostEvent = - DropDataMapPostEvent( - Some(dataMapSchema), - ifExistsSet, - sparkSession) - OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext) - } else if (!ifExistsSet) { - throw new NoSuchDataMapException(dataMapName, tableName) + // fires the event after dropping datamap from main table schema + val dropDataMapPostEvent = + DropDataMapPostEvent( + Some(dataMapSchema), + ifExistsSet, + sparkSession) + OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext) + } else if (!ifExistsSet) { + throw new NoSuchDataMapException(dataMapName, tableName) + } + } else if (mainTable != null && + mainTable.getTableInfo.getDataMapSchemaList.size() == 0) { + dropDataMapFromSystemFolder(sparkSession, tableName) } - } else if (carbonTable.isDefined && - carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) { - if (!ifExistsSet) { - throw new NoSuchDataMapException(dataMapName, tableName) + } catch { + case e: NoSuchDataMapException => + throw e + case ex: Exception => + LOGGER.error(ex, s"Dropping datamap $dataMapName failed") + throwMetadataException(dbName, tableName, + s"Dropping datamap $dataMapName failed: ${ ex.getMessage }") + } + finally { + if (carbonLocks.nonEmpty) { + val unlocked = carbonLocks.forall(_.unlock()) + if (unlocked) { + LOGGER.info("Table MetaData Unlocked Successfully") + } } } - } catch { - case e: NoSuchDataMapException => - throw e - case ex: Exception => - LOGGER.error(ex, s"Dropping datamap $dataMapName failed") - throwMetadataException(dbName, tableName, - s"Dropping datamap $dataMapName failed: ${ex.getMessage}") + } else { + dropDataMapFromSystemFolder(sparkSession) } - finally { - if (carbonLocks.nonEmpty) { - val unlocked = carbonLocks.forall(_.unlock()) - if (unlocked) { - LOGGER.info("Table MetaData Unlocked Successfully") - } + + Seq.empty + } + + private def dropDataMapFromSystemFolder(sparkSession: SparkSession, tableName: String = null) = { + if (dataMapSchema == null) { + val schema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala.find { dm => + dm.getDataMapName.equalsIgnoreCase(dataMapName) + } + dataMapSchema = schema match { + case Some(dmSchema) => dmSchema + case _ => null + } + } + if (dataMapSchema != null) { + // TODO do a check for existance before dropping + dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession) + dataMapProvider.freeMeta(mainTable, dataMapSchema) + } else if (!ifExistsSet) { + if (tableName != null) { + throw new NoSuchDataMapException(dataMapName, tableName) + } else { + throw new NoSuchDataMapException(dataMapName) } } - Seq.empty } override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder if (dataMapProvider != null) { - dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession) + dataMapProvider.freeData(mainTable, dataMapSchema) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 74da11a..458bc8d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -24,9 +24,11 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -44,6 +46,7 @@ case class CarbonDropTableCommand( var carbonTable: CarbonTable = _ var childDropCommands : Seq[CarbonDropTableCommand] = Seq.empty + var childDropDataMapCommands : Seq[CarbonDropDataMapCommand] = Seq.empty override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -107,7 +110,19 @@ case class CarbonDropTableCommand( dropCommand } childDropCommands.foreach(_.processMetadata(sparkSession)) + } else { + val schemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable) + childDropDataMapCommands = schemas.asScala.map{ schema => + val command = CarbonDropDataMapCommand(schema.getDataMapName, + ifExistsSet, + Some(TableIdentifier(tableName, Some(dbName))), + forceDrop = true) + command.dataMapSchema = schema + command.mainTable = carbonTable + command } + childDropDataMapCommands.foreach(_.processMetadata(sparkSession)) + } // fires the event after dropping main table val dropTablePostEvent: DropTablePostEvent = @@ -158,6 +173,7 @@ case class CarbonDropTableCommand( // drop all child tables childDropCommands.foreach(_.processData(sparkSession)) } + childDropDataMapCommands.foreach(_.processData(sparkSession)) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 5c23805..1b621c1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.command.{DataMapField, Field} import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES import org.apache.carbondata.core.metadata.schema.datamap.Granularity import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.preagg.TimeSeriesUDF http://git-wip-us.apache.org/repos/asf/carbondata/blob/05086e53/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 2905b60..d212ced 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -150,13 +150,19 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { */ protected lazy val createDataMap: Parser[LogicalPlan] = CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~ - (ON ~ TABLE) ~ (ident <~ ".").? ~ ident ~ + opt(ontable) ~ (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ (AS ~> restInput).? <~ opt(";") ^^ { - case ifnotexists ~ dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query => + case ifnotexists ~ dmname ~ tableIdent ~ className ~ dmprops ~ query => + val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String] - CarbonCreateDataMapCommand( - dmname, TableIdentifier(tableName, dbName), className, map, query, ifnotexists.isDefined) + CarbonCreateDataMapCommand(dmname, tableIdent, className, map, query, ifnotexists.isDefined) + } + + protected lazy val ontable: Parser[TableIdentifier] = + ON ~> TABLE ~> (ident <~ ".").? ~ ident ^^ { + case dbName ~ tableName => + TableIdentifier(tableName, dbName) } /** @@ -164,10 +170,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { * DROP DATAMAP IF EXISTS datamapName ON TABLE tablename */ protected lazy val dropDataMap: Parser[LogicalPlan] = - DROP ~> DATAMAP ~> opt(IF ~> EXISTS) ~ ident ~ (ON ~ TABLE) ~ - (ident <~ ".").? ~ ident <~ opt(";") ^^ { - case ifexists ~ dmname ~ ontable ~ dbName ~ tableName => - CarbonDropDataMapCommand(dmname, ifexists.isDefined, dbName, tableName) + DROP ~> DATAMAP ~> opt(IF ~> EXISTS) ~ ident ~ opt(ontable) <~ opt(";") ^^ { + case ifexists ~ dmname ~ tableIdent => + CarbonDropDataMapCommand(dmname, ifexists.isDefined, tableIdent) } /** @@ -175,9 +180,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { * SHOW DATAMAP ON TABLE tableName */ protected lazy val showDataMap: Parser[LogicalPlan] = - SHOW ~> DATAMAP ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ { - case databaseName ~ tableName => - CarbonDataMapShowCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase()) + SHOW ~> DATAMAP ~> opt(ontable) <~ opt(";") ^^ { + case tableIdent => + CarbonDataMapShowCommand(tableIdent) } protected lazy val deleteRecords: Parser[LogicalPlan] =