This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new eff4aa3  [CARBONDATA-3668] Fix compile issue of CarbonSessionCatalog 
for spark 2.4
eff4aa3 is described below

commit eff4aa39c1b0c5e929fa55471afd48d6cab9555b
Author: QiangCai <qiang...@qq.com>
AuthorDate: Thu Feb 6 09:18:02 2020 +0800

    [CARBONDATA-3668] Fix compile issue of CarbonSessionCatalog for spark 2.4
    
    Why is this PR needed?
    There is a compile issue of CarbonSessionCatalog for spark 2.4
    
    What changes were proposed in this PR?
    CarbonHiveSessionCatalog and CarbonSessionStateBuilder adapte spark 2.3 and 
2.4
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3605
---
 .../spark/sql/hive/CarbonSessionCatalogUtil.scala  | 190 +--------------------
 .../sql/hive/CarbonSessionStateBuilder.scala}      | 177 ++-----------------
 .../sql/hive/CarbonSessionStateBuilder.scala}      | 171 ++-----------------
 3 files changed, 29 insertions(+), 509 deletions(-)

diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index dd2d751..3b4d580 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -17,20 +17,16 @@
 
 package org.apache.spark.sql.hive
 
-import java.util.concurrent.Callable
-
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonSparkSqlParser, 
CarbonSparkSqlParserUtil}
 
@@ -183,181 +179,3 @@ object CarbonSessionCatalogUtil {
     storage.copy(locationUri = Some(path.toUri))
   }
 }
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
-    new HiveMetastoreCatalog(sparkSession),
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.init
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    var rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, 
name)(sparkSession)
-    if (isRelationRefreshed) {
-      rtnRelation = super.lookupRelation(name)
-      // Reset the stats after lookup.
-      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, 
name)(sparkSession)
-    }
-    rtnRelation
-  }
-
-  override def getCachedPlan(t: QualifiedTableName,
-      c: Callable[LogicalPlan]): LogicalPlan = {
-    val plan = super.getCachedPlan(t, c)
-    CarbonSessionUtil.updateCachedPlan(plan)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    CarbonSessionCatalogUtil.getClient(sparkSession)
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, 
cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, 
cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: 
TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches 
all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession, identifier: TableIdentifier): 
Seq[CatalogTablePartition] = {
-    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, 
sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    CarbonSessionCatalogUtil.updateStorageLocation(path, storage, 
newTableName, dbName)
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends HiveSessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new 
CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  /**
-   * Create a [[CarbonSessionStateBuilder]].
-   */
-  override protected lazy val catalog: CarbonHiveSessionCatalog = {
-    val catalog = new CarbonHiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  override protected def analyzer: Analyzer = {
-    new CarbonAnalyzer(catalog,
-      conf,
-      sparkSession,
-      super.analyzer)
-  }
-}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
 
b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
similarity index 52%
copy from 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
copy to 
integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index dd2d751..9ff36bc 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ 
b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -21,169 +21,21 @@ import java.util.concurrent.Callable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonSparkSqlParser, 
CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
-object CarbonSessionCatalogUtil {
-
-  /**
-   * Method used to update the table name
-   * @param oldTableIdentifier old table identifier
-   * @param newTableIdentifier new table identifier
-   * @param newTablePath new table path
-   */
-  def alterTableRename(
-      oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String,
-      sparkSession: SparkSession,
-      isExternal: Boolean
-  ): Unit = {
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ 
oldTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='TRUE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ 
oldTableIdentifier.table } " +
-      s"RENAME TO ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table }")
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='FALSE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table } " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ 
newTablePath }')")
-  }
-
-  /**
-   * Below method will be used to update serd properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ 
tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  def alterTableProperties(
-      sparkSession: SparkSession,
-      tableIdentifier: TableIdentifier,
-      properties: Map[String, String],
-      propKeys: Seq[String]
-  ): Unit = {
-    val catalog = sparkSession.sessionState.catalog
-    val table = catalog.getTableMetadata(tableIdentifier)
-    var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
-      val updatedPropKeys = propKeys.map(_.toLowerCase)
-      newProperties = newProperties.filter { case (k, _) => 
!updatedPropKeys.contains(k) }
-    }
-    if (!properties.isEmpty) {
-      newProperties = newProperties ++ 
CarbonSparkSqlParserUtil.normalizeProperties(properties)
-    }
-    val newTable = table.copy(
-      storage = table.storage.copy(properties = newProperties)
-    )
-    catalog.alterTable(newTable)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(sparkSession: SparkSession): 
org.apache.spark.sql.hive.client.HiveClient = {
-    //    For Spark2.2 we need to use unified Spark thrift server instead of 
carbon thrift
-    //    server. CarbonSession is not available anymore so HiveClient is 
created directly
-    //    using sparkSession.sharedState which internally contains all 
required carbon rules,
-    //    optimizers pluged-in through SessionStateBuilder in 
spark-defaults.conf.
-    //    
spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
-    CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  /**
-   * This method alters table to set serde properties and updates the catalog 
table with new updated
-   * schema for all the alter operations like add column, drop column, change 
datatype or rename
-   * column
-   * @param tableIdentifier
-   * @param schemaParts
-   * @param cols
-   */
-  private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
-    CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches 
all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, 
identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
 /**
  * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -315,13 +167,9 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
 
   override lazy val sqlParser: ParserInterface = new 
CarbonSparkSqlParser(conf, sparkSession)
 
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule)
+  experimentalMethods.extraStrategies =Seq(new 
StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, new 
CarbonUDFTransformRule)
 
   /**
    * Internal catalog for managing table and database states.
@@ -354,10 +202,7 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
     new HiveSessionResourceLoader(session, client)
   }
 
-  override protected def analyzer: Analyzer = {
-    new CarbonAnalyzer(catalog,
-      conf,
-      sparkSession,
-      super.analyzer)
-  }
+  override protected def analyzer: Analyzer =
+    new CarbonAnalyzer(catalog, conf, sparkSession, super.analyzer)
 }
+
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
 
b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
similarity index 53%
copy from 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
copy to 
integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index dd2d751..33438e4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ 
b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -21,169 +21,21 @@ import java.util.concurrent.Callable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTablePartition, ExternalCatalogWithListener, FunctionResourceLoader, 
GlobalTempViewManager}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, 
DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonSparkSqlParser, 
CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
-object CarbonSessionCatalogUtil {
-
-  /**
-   * Method used to update the table name
-   * @param oldTableIdentifier old table identifier
-   * @param newTableIdentifier new table identifier
-   * @param newTablePath new table path
-   */
-  def alterTableRename(
-      oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String,
-      sparkSession: SparkSession,
-      isExternal: Boolean
-  ): Unit = {
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ 
oldTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='TRUE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ 
oldTableIdentifier.table } " +
-      s"RENAME TO ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table }")
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='FALSE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ newTableIdentifier.database.get }.${ 
newTableIdentifier.table } " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ 
newTablePath }')")
-  }
-
-  /**
-   * Below method will be used to update serd properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ 
tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  def alterTableProperties(
-      sparkSession: SparkSession,
-      tableIdentifier: TableIdentifier,
-      properties: Map[String, String],
-      propKeys: Seq[String]
-  ): Unit = {
-    val catalog = sparkSession.sessionState.catalog
-    val table = catalog.getTableMetadata(tableIdentifier)
-    var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
-      val updatedPropKeys = propKeys.map(_.toLowerCase)
-      newProperties = newProperties.filter { case (k, _) => 
!updatedPropKeys.contains(k) }
-    }
-    if (!properties.isEmpty) {
-      newProperties = newProperties ++ 
CarbonSparkSqlParserUtil.normalizeProperties(properties)
-    }
-    val newTable = table.copy(
-      storage = table.storage.copy(properties = newProperties)
-    )
-    catalog.alterTable(newTable)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(sparkSession: SparkSession): 
org.apache.spark.sql.hive.client.HiveClient = {
-    //    For Spark2.2 we need to use unified Spark thrift server instead of 
carbon thrift
-    //    server. CarbonSession is not available anymore so HiveClient is 
created directly
-    //    using sparkSession.sharedState which internally contains all 
required carbon rules,
-    //    optimizers pluged-in through SessionStateBuilder in 
spark-defaults.conf.
-    //    
spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
-    CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, 
sparkSession)
-  }
-
-  /**
-   * This method alters table to set serde properties and updates the catalog 
table with new updated
-   * schema for all the alter operations like add column, drop column, change 
datatype or rename
-   * column
-   * @param tableIdentifier
-   * @param schemaParts
-   * @param cols
-   */
-  private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
-    CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches 
all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, 
identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
 /**
  * This class will have carbon catalog and refresh the relation from cache if 
the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -206,8 +58,8 @@ class CarbonHiveSessionCatalog(
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
   extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
+    () => externalCatalog,
+    () => globalTempViewManager,
     new HiveMetastoreCatalog(sparkSession),
     functionRegistry,
     conf,
@@ -344,14 +196,19 @@ class CarbonSessionStateBuilder(sparkSession: 
SparkSession,
   }
 
   private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+    session
+      .sharedState
+      .externalCatalog
+      .asInstanceOf[ExternalCatalogWithListener]
+      .unwrapped
+      .asInstanceOf[HiveExternalCatalog]
 
   /**
    * Create a Hive aware resource loader.
    */
   override protected lazy val resourceLoader: HiveSessionResourceLoader = {
     val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
+    new HiveSessionResourceLoader(session, () => client)
   }
 
   override protected def analyzer: Analyzer = {

Reply via email to