[HotFix] Getting carbon table identifier to datamap events Passing the table identifier to keep track of table in event in case preload and postload of datamap event.
This closes #2448 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aec47e06 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aec47e06 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aec47e06 Branch: refs/heads/carbonstore Commit: aec47e06ff57dbfe6180f7ba2574700ac07ae8f1 Parents: 1c4358e Author: Jatin <jatin.de...@knoldus.in> Authored: Wed Jul 4 19:53:48 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Tue Jul 17 14:49:16 2018 +0530 ---------------------------------------------------------------------- .../org/apache/carbondata/events/DataMapEvents.scala | 13 +++++++++---- .../command/datamap/CarbonCreateDataMapCommand.scala | 12 ++++++++---- .../command/datamap/CarbonDataMapRebuildCommand.scala | 8 ++++++-- .../command/datamap/CarbonDropDataMapCommand.scala | 4 ++-- 4 files changed, 25 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/aec47e06/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala index 8fb374f..72c980c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala @@ -18,6 +18,7 @@ package org.apache.carbondata.events import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -26,14 +27,16 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier * example: bloom datamap, Lucene datamap */ case class CreateDataMapPostExecutionEvent(sparkSession: SparkSession, - storePath: String) extends Event with CreateDataMapEventsInfo + storePath: String, tableIdentifier: TableIdentifier) + extends Event with CreateDataMapEventsInfo /** * For handling operation's before start of update index datmap status over table with index datamap * example: bloom datamap, Lucene datamap */ case class UpdateDataMapPreExecutionEvent(sparkSession: SparkSession, - storePath: String) extends Event with CreateDataMapEventsInfo + storePath: String, tableIdentifier: TableIdentifier) + extends Event with CreateDataMapEventsInfo /** * For handling operation's after finish of update index datmap status over table with index @@ -41,7 +44,8 @@ case class UpdateDataMapPreExecutionEvent(sparkSession: SparkSession, * example: bloom datamap, Lucene datamap */ case class UpdateDataMapPostExecutionEvent(sparkSession: SparkSession, - storePath: String) extends Event with CreateDataMapEventsInfo + storePath: String, tableIdentifier: TableIdentifier) + extends Event with CreateDataMapEventsInfo /** * For handling operation's before start of index build over table with index datamap @@ -64,5 +68,6 @@ case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession, * example: bloom datamap, Lucene datamap */ case class CreateDataMapPreExecutionEvent(sparkSession: SparkSession, - storePath: String) extends Event with CreateDataMapEventsInfo + storePath: String, tableIdentifier: TableIdentifier) + extends Event with CreateDataMapEventsInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/aec47e06/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 33dba28..7600160 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -125,13 +125,15 @@ case class CarbonCreateDataMapCommand( val operationContext: OperationContext = new OperationContext() val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent = - new CreateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + new CreateDataMapPreExecutionEvent(sparkSession, + systemFolderLocation, tableIdentifier.get) OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent, operationContext) dataMapProvider.initMeta(queryString.orNull) DataMapStatusManager.disableDataMap(dataMapName) val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent = - new CreateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + new CreateDataMapPostExecutionEvent(sparkSession, + systemFolderLocation, tableIdentifier.get) OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent, operationContext) case _ => @@ -155,12 +157,14 @@ case class CarbonCreateDataMapCommand( val operationContext: OperationContext = new OperationContext() val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = - new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + new UpdateDataMapPreExecutionEvent(sparkSession, + systemFolderLocation, tableIdentifier.get) OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, operationContext) DataMapStatusManager.enableDataMap(dataMapName) val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = - new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + new UpdateDataMapPostExecutionEvent(sparkSession, + systemFolderLocation, tableIdentifier.get) OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, operationContext) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aec47e06/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala index beadc7e..f3db6ca 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala @@ -54,12 +54,16 @@ case class CarbonDataMapRebuildCommand( val operationContext: OperationContext = new OperationContext() val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = - new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + new UpdateDataMapPreExecutionEvent(sparkSession, + systemFolderLocation, + new TableIdentifier(table.getTableName, Some(table.getDatabaseName))) OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, operationContext) DataMapStatusManager.enableDataMap(dataMapName) val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = - new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + new UpdateDataMapPostExecutionEvent(sparkSession, + systemFolderLocation, + new TableIdentifier(table.getTableName, Some(table.getDatabaseName))) OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, operationContext) Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/aec47e06/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index bae00ee..8bc269b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -205,12 +205,12 @@ case class CarbonDropDataMapCommand( val operationContext: OperationContext = new OperationContext() val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent = - UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation) + UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation, null) OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent, operationContext) DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName) val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent = - UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation) + UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation, null) OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent, operationContext) // if it is indexDataMap provider like lucene, then call cleanData, which will launch a job