http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index a1c68a3..c64f50b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -28,7 +28,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation @@ -48,6 +47,8 @@ private[sql] case class CarbonAlterTableRenameCommand( val newTableIdentifier = alterTableRenameModel.newTableIdentifier val oldDatabaseName = oldTableIdentifier.database .getOrElse(sparkSession.catalog.currentDatabase) + setAuditTable(oldDatabaseName, oldTableIdentifier.table) + setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table)) val newDatabaseName = newTableIdentifier.database .getOrElse(sparkSession.catalog.currentDatabase) if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) { @@ -60,15 +61,12 @@ private[sql] case class CarbonAlterTableRenameCommand( } val oldTableName = oldTableIdentifier.table.toLowerCase val newTableName = newTableIdentifier.table.toLowerCase - Audit.log(LOGGER, s"Rename table request has been received for $oldDatabaseName.$oldTableName") LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName") val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation: CarbonRelation = metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession) .asInstanceOf[CarbonRelation] if (relation == null) { - Audit.log(LOGGER, s"Rename table request has failed. " + - s"Table $oldDatabaseName.$oldTableName does not exist") throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist") } @@ -162,13 +160,11 @@ private[sql] case class CarbonAlterTableRenameCommand( OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext) sparkSession.catalog.refreshTable(newIdentifier.quotedString) - Audit.log(LOGGER, s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { case e: ConcurrentOperationException => throw e case e: Exception => - LOGGER.error("Rename table failed: " + e.getMessage, e) if (carbonTable != null) { AlterTableUtil.revertRenameTableChanges( newTableName, @@ -182,4 +178,5 @@ private[sql] case class CarbonAlterTableRenameCommand( Seq.empty } + override protected def opName: String = "ALTER TABLE RENAME TABLE" }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala index 51c0e6e..b1e7e33 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala @@ -29,17 +29,18 @@ private[sql] case class CarbonAlterTableSetCommand( isView: Boolean) extends MetadataCommand { - override def run(sparkSession: SparkSession): Seq[Row] = { - processMetadata(sparkSession) - } - override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase), + tableIdentifier.table) AlterTableUtil.modifyTableProperties( tableIdentifier, properties, Nil, set = true)(sparkSession, sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + setAuditInfo(properties) Seq.empty } + + override protected def opName: String = "ALTER TABLE SET" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala index 2490f9e..361ba1d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala @@ -29,16 +29,17 @@ private[sql] case class CarbonAlterTableUnsetCommand( propKeys: Seq[String], ifExists: Boolean, isView: Boolean) - extends RunnableCommand with MetadataProcessOpeation { - - override def run(sparkSession: SparkSession): Seq[Row] = { - processMetadata(sparkSession) - } + extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase), + tableIdentifier.table) AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String], propKeys, false)(sparkSession, sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + setAuditInfo(Map("unset" -> propKeys.mkString(", "))) Seq.empty } + + override protected def opName: String = "ALTER TABLE UNSET" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala index 8d6a4cc..90da68a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala @@ -56,4 +56,6 @@ case class CarbonGetTableDetailCommand( AttributeReference("table size", LongType, nullable = false)(), AttributeReference("last modified time", LongType, nullable = false)()) } + + override protected def opName: String = "GET TABLE DETAIL" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala index 1f8bde2..a95d6a4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala @@ -55,6 +55,8 @@ case class CarbonCreateStreamCommand( AttributeReference("Status", StringType, nullable = false)()) override def processData(sparkSession: SparkSession): Seq[Row] = { + setAuditTable(CarbonEnv.getDatabaseName(sinkDbName)(sparkSession), sinkTableName) + setAuditInfo(Map("streamName" -> streamName, "query" -> query) ++ optionMap) val inputQuery = sparkSession.sql(query) val sourceTableSeq = inputQuery.logicalPlan collect { case r: LogicalRelation @@ -286,4 +288,5 @@ case class CarbonCreateStreamCommand( Util.convertToSparkSchema(sourceTable, sortedCols) } + override protected def opName: String = "CREATE STREAM" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala index 82b84ef..49f1d11 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala @@ -30,7 +30,10 @@ case class CarbonDropStreamCommand( ifExists: Boolean ) extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + setAuditInfo(Map("streamName" -> streamName)) StreamJobManager.stopStream(streamName, ifExists) Seq.empty } + + override protected def opName: String = "DROP STREAM" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala index 49c2ffb..ee749b3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala @@ -49,6 +49,7 @@ case class CarbonShowStreamsCommand( case None => StreamJobManager.getAllJobs.toSeq case Some(table) => val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession) + setAuditTable(carbonTable) StreamJobManager.getAllJobs.filter { job => job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) && job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName) @@ -73,4 +74,6 @@ case class CarbonShowStreamsCommand( ) } } + + override protected def opName: String = "SHOW STREAMS" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala index 3252f1d..54be619 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala @@ -24,9 +24,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand -import org.apache.carbondata.api.CarbonStore.LOGGER -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.metadata.schema.table.TableInfo /** @@ -47,7 +44,6 @@ case class CarbonCreateTableAsSelectCommand( var loadCommand: CarbonInsertIntoCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = tableInfo.getFactTable.getTableName var isTableCreated = false var databaseOpt: Option[String] = None @@ -55,14 +51,12 @@ case class CarbonCreateTableAsSelectCommand( databaseOpt = Some(tableInfo.getDatabaseName) } val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) - Audit.log(LOGGER, s"Request received for CTAS for $dbName.$tableName") + setAuditTable(dbName, tableName) + setAuditInfo(Map("query" -> query.simpleString)) // check if table already exists if (sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { if (!ifNotExistsSet) { - Audit.log(LOGGER, - s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + - s"Table [$tableName] already exists under database [$dbName]") throw new TableAlreadyExistsException(dbName, tableName) } } else { @@ -78,7 +72,6 @@ case class CarbonCreateTableAsSelectCommand( databaseOpt = Some(tableInfo.getDatabaseName) } val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore .createCarbonDataSourceHadoopRelation(sparkSession, TableIdentifier(tableName, Option(dbName))) @@ -95,11 +88,7 @@ case class CarbonCreateTableAsSelectCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { if (null != loadCommand) { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) loadCommand.processData(sparkSession) - val carbonTable = loadCommand.relation.carbonTable - Audit.log(LOGGER, s"CTAS operation completed successfully for " + - s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}") } Seq.empty } @@ -117,4 +106,6 @@ case class CarbonCreateTableAsSelectCommand( Option(dbName), tableName).run(sparkSession) Seq.empty } + + override protected def opName: String = "CREATE TABLE AS SELECT" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala index 5d039bf..ca39931 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala @@ -19,15 +19,12 @@ package org.apache.spark.sql.execution.command.table import scala.collection.JavaConverters._ -import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.MetadataCommand -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory @@ -36,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo} import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -59,17 +56,16 @@ case class CarbonCreateTableCommand( databaseOpt = Some(tableInfo.getDatabaseName) } val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) + setAuditTable(dbName, tableName) + setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap + ++ Map("external" -> isExternal.toString)) // set dbName and tableUnique Name in the table info tableInfo.setDatabaseName(dbName) tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)) - Audit.log(LOGGER, s"Creating Table with Database name [$dbName] and Table name [$tableName]") val isTransactionalTable = tableInfo.isTransactionalTable if (sparkSession.sessionState.catalog.listTables(dbName) .exists(_.table.equalsIgnoreCase(tableName))) { if (!ifNotExistsSet) { - Audit.log(LOGGER, - s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + - s"Table [$tableName] already exists under database [$dbName]") throw new TableAlreadyExistsException(dbName, tableName) } } else { @@ -180,16 +176,15 @@ case class CarbonCreateTableCommand( case _: Exception => // No operation } val msg = s"Create table'$tableName' in database '$dbName' failed" - Audit.log(LOGGER, msg.concat(", ").concat(e.getMessage)) - LOGGER.error(msg, e) throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage)) } } val createTablePostExecutionEvent: CreateTablePostExecutionEvent = CreateTablePostExecutionEvent(sparkSession, tableIdentifier) OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext) - Audit.log(LOGGER, s"Table created with Database name [$dbName] and Table name [$tableName]") } Seq.empty } + + override protected def opName: String = "CREATE TABLE" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index b513c1f..8edc854 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -45,6 +45,7 @@ private[sql] case class CarbonDescribeFormattedCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + setAuditTable(relation.databaseName, relation.tableName) val mapper = new ObjectMapper() val colProps = StringBuilder.newBuilder val dims = relation.metaData.dims.map(x => x.toLowerCase) @@ -267,4 +268,6 @@ private[sql] case class CarbonDescribeFormattedCommand( Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s") } } + + override protected def opName: String = "DESC FORMATTED" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 37ab04f..34d9b75 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -54,6 +53,7 @@ case class CarbonDropTableCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase) + setAuditTable(dbName, tableName) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) @@ -71,7 +71,6 @@ case class CarbonDropTableCommand( if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "drop table") } - Audit.log(LOGGER, s"Deleting table [$tableName] under database [$dbName]") if (carbonTable.isStreamingSink) { // streaming table should acquire streaming.lock carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK) @@ -142,8 +141,6 @@ case class CarbonDropTableCommand( ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext) - Audit.log(LOGGER, s"Deleted table [$tableName] under database [$dbName]") - } catch { case ex: NoSuchTableException => LOGGER.error(ex.getLocalizedMessage, ex) @@ -200,4 +197,5 @@ case class CarbonDropTableCommand( Seq.empty } + override protected def opName: String = "DROP TABLE" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala index cb402c7..8939c6a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala @@ -32,7 +32,7 @@ case class CarbonExplainCommand( ) extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val explainCommand = child.asInstanceOf[ExplainCommand] - + setAuditInfo(Map("query" -> explainCommand.logicalPlan.simpleString)) val isCommand = explainCommand.logicalPlan match { case _: Command => true case Union(childern) if childern.forall(_.isInstanceOf[Command]) => true @@ -61,5 +61,7 @@ case class CarbonExplainCommand( ExplainCollector.remove() } } + + override protected def opName: String = "EXPLAIN" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala index 534703d..d50b766 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala @@ -63,4 +63,5 @@ private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String], } + override protected def opName: String = "SHOW TABLES" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 16d974e..b4dd1b1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -256,7 +256,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { AttributeSet(handledPredicates.flatMap(_.references)) -- (projectSet ++ unhandledSet).map(relation.attributeMap) } catch { - case e => throw new CarbonPhysicalPlanException + case e: Throwable => throw new CarbonPhysicalPlanException } } // Combines all Catalyst filter `Expression`s that are either not convertible to data source http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index c681b62..e26163f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -63,11 +63,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) } case class CarbonSetCommand(command: SetCommand) - extends RunnableCommand { + extends MetadataCommand { override val output: Seq[Attribute] = command.output - override def run(sparkSession: SparkSession): Seq[Row] = { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams command.kv match { case Some((key, Some(value))) => @@ -86,6 +86,9 @@ case class CarbonSetCommand(command: SetCommand) } command.run(sparkSession) } + + override protected def opName: String = "SET" + } object CarbonSetCommand { http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 39e2f30..4ce4459 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -39,7 +39,6 @@ import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} @@ -548,11 +547,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { val colName = name.substring(14) if (name.startsWith("default.value.") && fields.count(p => p.column.equalsIgnoreCase(colName)) == 1) { - LOGGER.error(s"Duplicate default value exist for new column: ${ colName }") - Audit.log(LOGGER, - s"Validation failed for Create/Alter Table Operation " + - s"for ${ table }. " + - s"Duplicate default value exist for new column: ${ colName }") sys.error(s"Duplicate default value exist for new column: ${ colName }") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 27443a8..3faa111 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder @@ -43,9 +41,8 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.format.{Encoding, SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil} @@ -70,8 +67,6 @@ object AlterTableUtil { .lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] if (relation == null) { - Audit.log(LOGGER, s"Alter table request has failed. " + - s"Table $dbName.$tableName does not exist") sys.error(s"Table $dbName.$tableName does not exist") } // acquire the lock first @@ -294,7 +289,6 @@ object AlterTableUtil { (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { val tableName = tableIdentifier.table val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) - Audit.log(LOGGER, s"Alter table newProperties request has been received for $dbName.$tableName") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] try { @@ -383,10 +377,8 @@ object AlterTableUtil { propKeys, set) LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName") - Audit.log(LOGGER, s"Alter table newProperties is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table newProperties failed", e) sys.error(s"Alter table newProperties operation failed: ${e.getMessage}") } finally { // release lock after command execution completion