[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152730693 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -107,15 +110,14 @@ class CarbonSessionCatalog( carbonEnv.carbonMetastore. checkSchemasModifiedTimeAndReloadTables(storePath) -val tableMeta = carbonEnv.carbonMetastore - .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, -carbonDatasourceHadoopRelation.carbonTable.getFactTableName) -if (tableMeta.isEmpty || (tableMeta.isDefined && -tableMeta.get.carbonTable.getTableLastUpdatedTime != - carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { +val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( + carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) +if (table.isEmpty || (table.isDefined && + table.get.carbonTable.getTableLastUpdatedTime != --- End diff -- wrong indent ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152730610 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.hive + --- End diff -- remove empty line ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152730468 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { +if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder +} else if (sparkSession.version.contains("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder --- End diff -- what's the difference between the code for 2.1 and 2.2 ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152729141 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { +if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSqlAstBuilder") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val astBuilder = ctor.newInstance(conf, parser).asInstanceOf[AstBuilder] + astBuilder +} else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152729132 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala --- @@ -38,7 +40,26 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val parser = new CarbonSpark2SqlParser + val astBuilder = getAstBuilder() + + def getAstBuilder(): AstBuilder = { +if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152729085 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -17,19 +17,23 @@ package org.apache.spark.sql.parser +import java.lang.reflect.InvocationTargetException + import scala.collection.mutable import scala.language.implicitConversions import org.apache.spark.sql.{DeleteRecords, ShowLoadsCommand, UpdateTable} -import org.apache.spark.sql.catalyst.CarbonDDLSqlParser +import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand} import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand} import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedRelation, UnresolvedStar} --- End diff -- the order of imports is wrong. ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728991 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala --- @@ -32,76 +32,6 @@ class CarbonSQLConf(sparkSession: SparkSession) { /** * To initialize dynamic param defaults along with usage docs */ - def addDefaultCarbonParams(): Unit = { -val ENABLE_UNSAFE_SORT = - SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) -.doc("To enable/ disable unsafe sort.") -.booleanConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) -val CARBON_CUSTOM_BLOCK_DISTRIBUTION = - SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) -.doc("To enable/ disable carbon custom block distribution.") -.booleanConf -.createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean) -val BAD_RECORDS_LOGGER_ENABLE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) -.doc("To enable/ disable carbon bad record logger.") -.booleanConf -.createWithDefault(CarbonLoadOptionConstants - .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) -val BAD_RECORDS_ACTION = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) -.doc("To configure the bad records action.") -.stringConf -.createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, -CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) -val IS_EMPTY_DATA_BAD_RECORD = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) -.doc("Property to decide weather empty data to be considered bad/ good record.") -.booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT - .toBoolean) -val SORT_SCOPE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) -.doc("Property to specify sort scope.") -.stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) -val BATCH_SORT_SIZE_INMB = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) -.doc("Property to specify batch sort size in MB.") -.stringConf -.createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, -CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) -val SINGLE_PASS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) -.doc("Property to enable/disable single_pass.") -.booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) -val BAD_RECORD_PATH = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) -.doc("Property to configure the bad record location.") -.stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) -val GLOBAL_SORT_PARTITIONS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) -.doc("Property to configure the global sort partitions.") -.stringConf -.createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, -CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) -val DATEFORMAT = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) -.doc("Property to configure data format for date type columns.") -.stringConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - } --- End diff -- why does it need to remove above lines? ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728865 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSqlConfFactory.scala --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.sql.hive.CarbonSqlConfCompileCode.AbstractCarbonSqlConfFactory +import org.apache.spark.util.ScalaCompilerUtil + + +private[sql] class CarbonSqlConfCodeGenerateFactory(version: String) { + + val carbonSqlConfFactory = if (version.equals("2.1")) { --- End diff -- use startsWith instead of equals ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728764 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonOptimizer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.ExperimentalMethods +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.hive.CarbonOptimizerCompileCode.AbstractCarbonOptimizerFactory +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ScalaCompilerUtil + + +private[sql] class CarbonOptimizerCodeGenerateFactory(version: String) { + + val carbonoptimizerFactory = if (version.equals("2.1")) { --- End diff -- use startsWith instead of equals. ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728579 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala --- @@ -153,8 +153,11 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive.runSqlHive( - s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") +val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client +hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") + +sparkSession.sessionState --- End diff -- unused code, remove it. ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728527 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala --- @@ -115,18 +121,50 @@ class CarbonFileMetastore extends CarbonMetaStore { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) } + val rm = universe.runtimeMirror(getClass.getClassLoader) + + def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): CatalogTable = { +val im = rm.reflect(obj) +val sym = im.symbol.typeSignature.member(TermName(name)) +val tableMeta = im.reflectMethod(sym.asMethod).apply() +tableMeta.asInstanceOf[CatalogTable] + } + override def lookupRelation(tableIdentifier: TableIdentifier) (sparkSession: SparkSession): LogicalPlan = { val database = tableIdentifier.database.getOrElse( sparkSession.catalog.currentDatabase) val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), - _) => + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) => carbonDatasourceHadoopRelation.carbonRelation case LogicalRelation( carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => carbonDatasourceHadoopRelation.carbonRelation + +// case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && +// getField("tableMeta", c) +// .asInstanceOf[CatalogTable].provider +// .isDefined && +// getField("tableMeta", c) +// .asInstanceOf[String] +// .equals("org.apache.spark.sql.CarbonSource") => +//new CarbonSource() +// .createRelation(sparkSession.sqlContext, +//c.tableMeta.storage.properties) +// .asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation + + case SubqueryAlias(_, c: CatalogRelation) if sparkSession.version.contains("2.2") && --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728433 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { -case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => +case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { +// SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) +val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] +subqueryAlias + } else if (sparkSession.version.contains("2.2")) { +// SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), +// Project(projList, relation)) +val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan) + .asInstanceOf[SubqueryAlias] +subqueryAlias + } else { +throw new UnsupportedOperationException("Unsupported Spark version") + } } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) -val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) +// TODO use reflection +// val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse(""))) +val destinationTable = + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728420 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { -case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => +case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { +// SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) +val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] +subqueryAlias + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728413 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { -case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => +case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728438 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } else { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) } newPlan transform { -case UnresolvedRelation(t, Some(a)) - if !transformed && t == table.tableIdentifier && a == alias => +case CarbonUnresolvedRelation(t) + if !transformed && t == table.tableIdentifier => transformed = true // Add the filter condition of update statement on destination table - SubqueryAlias(alias, updatedSelectPlan, Option(table.tableIdentifier)) + // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + if (sparkSession.version.contains("2.1")) { +// SubqueryAlias(alias1, updatedSelectPlan, Option(table.tableIdentifier)) +val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), updatedSelectPlan, Option(table.tableIdentifier)) + .asInstanceOf[SubqueryAlias] +subqueryAlias + } else if (sparkSession.version.contains("2.2")) { +// SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), +// Project(projList, relation)) +val clazz = Utils + .classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor.newInstance(alias.getOrElse(""), updatedSelectPlan) + .asInstanceOf[SubqueryAlias] +subqueryAlias + } else { +throw new UnsupportedOperationException("Unsupported Spark version") + } } } else { updatedSelectPlan } val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString())) val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession)) -val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias) +// TODO use reflection +// val destinationTable = UnresolvedRelation(table.tableIdentifier, Some(alias.getOrElse(""))) +val destinationTable = + if (sparkSession.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val unresolvedrelation = ctor +.newInstance(table.tableIdentifier, + Some(alias.getOrElse(""))).asInstanceOf[UnresolvedRelation] +unresolvedrelation +} else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728384 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica selectPlan } val finalPlan = if (filter.length > 0) { - val alias = table.alias.getOrElse("") var transformed: Boolean = false // Create a dummy projection to include filter conditions var newPlan: LogicalPlan = null if (table.tableIdentifier.database.isDefined) { newPlan = parser.parsePlan("select * from " + - table.tableIdentifier.database.getOrElse("") + "." + - table.tableIdentifier.table + " " + alias + " " + filter) + table.tableIdentifier.database.getOrElse("") + "." + + table.tableIdentifier.table + " " + alias.getOrElse("") + " " + + filter) --- End diff -- the indent of above 3 lines is wrong ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728263 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localalias = alias match { +case Some(a) => Some(alias.toSeq) +case _ => None + } val projList = Seq( -UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) +UnresolvedAlias(UnresolvedStar(localalias)), tupleId) // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), -Project(projList, relation), Option(table.tableIdentifier)) +// SubqueryAlias(alias.getOrElse(""), +//Project(projList, relation), Option(table.tableIdentifier)) +// + if (sparkSession.version.contains("2.1")) { +// SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(), +// Project(projList, relation), Option(table.tableIdentifier)) +val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias") +val ctor = clazz.getConstructors.head +ctor.setAccessible(true) +val subqueryAlias = ctor + .newInstance(alias.getOrElse(""), +Project(projList, relation), Option(table.tableIdentifier)).asInstanceOf[SubqueryAlias] +subqueryAlias + } else if (sparkSession.version.contains("2.2")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152728246 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala --- @@ -95,11 +165,40 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica def prepareTargetReleation(relation: UnresolvedRelation): SubqueryAlias = { val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId", Seq.empty, isDistinct = false), "tupleId")()) + + val localalias = alias match { +case Some(a) => Some(alias.toSeq) +case _ => None + } val projList = Seq( -UnresolvedAlias(UnresolvedStar(Option(table.alias.toSeq))), tupleId) +UnresolvedAlias(UnresolvedStar(localalias)), tupleId) // include tuple id and rest of the required columns in subqury - SubqueryAlias(table.alias.getOrElse(""), -Project(projList, relation), Option(table.tableIdentifier)) +// SubqueryAlias(alias.getOrElse(""), +//Project(projList, relation), Option(table.tableIdentifier)) +// + if (sparkSession.version.contains("2.1")) { --- End diff -- use startsWith instead of contains ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152726082 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala --- @@ -26,13 +26,20 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} +import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + --- End diff -- remove empty line ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152725990 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala --- @@ -42,6 +43,7 @@ import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil + --- End diff -- remove empty line ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152725691 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { +if (sparkContext.version.contains("2.1")) { + val clazz = Utils.classForName("org.apache.spark.sql.hive.CarbonSessionState") + val ctor = clazz.getConstructors.head + ctor.setAccessible(true) + val sessionState1 = ctor.newInstance(this).asInstanceOf[SessionState] + sessionState1 +} else if (sparkContext.version.contains("2.2")) { --- End diff -- use sparkContext.version.startsWith("2.2") ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152721088 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { +if (sparkContext.version.contains("2.1")) { --- End diff -- It'd better use sparkContext.version.**startsWith**("2.1"), if version = 2.2.1, contains("2.1") will return true. ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152584958 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala --- @@ -80,7 +80,13 @@ case class CarbonCreateTableCommand( val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - +// +// sparkSession.sql( +//s"""CREATE TABLE $dbName.$tbName +// |(${ fields.map(f => f.rawSchema.replace("`", "")).mkString(",") }) +// |USING org.apache.spark.sql.CarbonSource""".stripMargin + +//s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + +//s$tablePath"$carbonSchemaString) """) --- End diff -- remove commented code ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152583029 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala --- @@ -21,16 +21,19 @@ import java.text.{ParseException, SimpleDateFormat} import java.util import java.util.{Locale, TimeZone} +import scala.Option import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.CastExpr import org.apache.spark.sql.sources -import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties + --- End diff -- remove the empty line ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152582968 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala --- @@ -96,6 +99,8 @@ object CastExpressionOptimization { } } + + --- End diff -- remove the empty lines ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152582582 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + + + + def getSessionState(sparkContext: SparkContext): SessionState = { --- End diff -- move this reflection code to one common util ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152582199 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala --- @@ -42,15 +42,46 @@ class CarbonSession(@transient val sc: SparkContext, this(sc, None) } + + // SessionStateCodeGenerateFactory.init(sc.version) + // CarbonOptimizerCodeGenerateFactory.init(sc.version) + // val carbonDefaultOptimizer = CarbonOptimizerCodeGenerateFactory.getInstance() + // .carbonoptimizerFactory.createCarbonOptimizer() + // @transient + // override lazy val sessionState: SessionState = new CarbonSessionState(this) + --- End diff -- remove the commented code ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152579581 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { +def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { +val castExpr = expr.asInstanceOf[Cast] +if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) +} else { + None +} + } else { +None + } + +} + } + + object CarbonDescribeTable { +def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { +val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] +if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { +if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, +describeTableCommand.partitionSpec, +describeTableCommand.isExtended) +} else { + None +} + } else { +None + } +} else { + None +} + } else { +None + } +} + } + + object CarbonSubqueryAlias { +def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { +val subqueryAlias = plan.asInstanceOf[SubqueryAlias] +if (subqueryAlias.alias.isInstanceOf[String]) { + if (subqueryAlias.child.isInstanceOf[LogicalPlan]) { +Some(subqueryAlias.alias, + subqueryAlias.child) --- End diff -- Move to above line ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152578680 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { +def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { +val castExpr = expr.asInstanceOf[Cast] +if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) +} else { + None +} + } else { +None + } + +} + } + + object CarbonDescribeTable { +def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { +val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] +if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { +if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { + Some(describeTableCommand.table, +describeTableCommand.partitionSpec, +describeTableCommand.isExtended) +} else { + None +} + } else { +None + } +} else { + None +} + } else { +None + } +} + } + + object CarbonSubqueryAlias { +def unapply(plan: LogicalPlan): Option[(String, LogicalPlan)] = { + if (plan.isInstanceOf[SubqueryAlias]) { +val subqueryAlias = plan.asInstanceOf[SubqueryAlias] +if (subqueryAlias.alias.isInstanceOf[String]) { + if (subqueryAlias.child.isInstanceOf[LogicalPlan]) { --- End diff -- combine both `if` conditions ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152578348 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { +def unapply(expr: Expression): Option[(Attribute, DataType)] = { + if (expr.isInstanceOf[Cast]) { +val castExpr = expr.asInstanceOf[Cast] +if (castExpr.child.isInstanceOf[Attribute]) { + Some((castExpr.child.asInstanceOf[Attribute], castExpr.dataType)) +} else { + None +} + } else { +None + } + +} + } + + object CarbonDescribeTable { +def unapply(plan: LogicalPlan): Option[(TableIdentifier, TablePartitionSpec, Boolean)] = { + if (plan.isInstanceOf[DescribeTableCommand]) { +val describeTableCommand = plan.asInstanceOf[DescribeTableCommand] +if (describeTableCommand.table.isInstanceOf[TableIdentifier]) { + if (describeTableCommand.partitionSpec.isInstanceOf[TablePartitionSpec]) { +if (describeTableCommand.isExtended.isInstanceOf[Boolean]) { --- End diff -- Combine all 3 `if` conditions to one `if` ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152577825 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.types.DataType + + +object CarbonExpressions { + + object MatchCast { --- End diff -- Add comments ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152575948 --- Diff: assembly/pom.xml --- @@ -126,7 +126,7 @@ - spark-2.1 --- End diff -- It should not be removed, need to add another profile for 2.2 ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152571649 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -85,7 +86,7 @@ object CarbonEnv { def getInstance(sparkSession: SparkSession): CarbonEnv = { if (sparkSession.isInstanceOf[CarbonSession]) { - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv --- End diff -- don't change the format ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152571409 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala --- @@ -58,7 +59,7 @@ class CarbonEnv { ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo) val config = new CarbonSQLConf(sparkSession) if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) { -config.addDefaultCarbonParams() +//config.addDefaultCarbonParams() --- End diff -- remove if not required ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152571235 --- Diff: integration/spark2/pom.xml --- @@ -36,7 +36,7 @@ org.apache.carbondata - carbondata-spark-common + carbondata-streaming --- End diff -- why it is required to change ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152570799 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -59,6 +61,26 @@ case class CarbonDictionaryTempDecoder( class CarbonDecoderProcessor { + val rm = universe.runtimeMirror(getClass.getClassLoader) + +// def getFields[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { +//val im = rm.reflect(obj) +//typeOf[T].members.collect { +// case m : MethodSymbol if m.isCaseAccessor && m.name.toString.equalsIgnoreCase(name) => +//val value = im.reflectMethod(m).apply() +//value +//} (collection.breakOut) +// } + + def getField[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { --- End diff -- Move all reflection methods to one common utility class ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152570507 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -84,7 +106,19 @@ class CarbonDecoderProcessor { } nodeList.add(ArrayCarbonNode(nodeListSeq)) case e: UnaryNode => process(e.child, nodeList) - case i: InsertIntoTable => process(i.child, nodeList) + case i: InsertIntoTable => +var sparkVersion21: Boolean = false +if (typeOf[InsertIntoTable].members.filter(!_.isMethod).toList.contains("query")) { + sparkVersion21 = false --- End diff -- you can get the version from sparkcontext , no need to check like this ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152570094 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala --- @@ -59,6 +61,26 @@ case class CarbonDictionaryTempDecoder( class CarbonDecoderProcessor { + val rm = universe.runtimeMirror(getClass.getClassLoader) + +// def getFields[T: TypeTag: reflect.ClassTag]( name: String, obj: T): Any = { +//val im = rm.reflect(obj) +//typeOf[T].members.collect { +// case m : MethodSymbol if m.isCaseAccessor && m.name.toString.equalsIgnoreCase(name) => +//val value = im.reflectMethod(m).apply() +//value +//} (collection.breakOut) +// } --- End diff -- remove the commented code ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152569958 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/describeTable/TestDescribeTable.scala --- @@ -34,9 +34,9 @@ class TestDescribeTable extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE Desc2(Dec2Col1 BigInt, Dec2Col2 String, Dec2Col3 Bigint, Dec2Col4 Decimal) stored by 'carbondata'") } - test("test describe table") { -checkAnswer(sql("DESC Desc1"), sql("DESC Desc2")) - } +// test("test describe table") { --- End diff -- Keep the annotation ignore and fix later ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r152569806 --- Diff: integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/CarbonV1toV3CompatabilityTestCase.scala --- @@ -49,8 +49,10 @@ class CarbonV1toV3CompatabilityTestCase extends QueryTest with BeforeAndAfterAll .getOrCreateCarbonSession(storeLocation, metaLocation).asInstanceOf[CarbonSession] println("store path from env : " + CarbonEnv.getInstance(localspark).storePath) localspark.sparkContext.setLogLevel("WARN") -localspark.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( + localspark.asInstanceOf[CarbonSession].asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client.runSqlHive( --- End diff -- It is not required to cast twice ---
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
Github user zzcclp commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1469#discussion_r149264766 --- Diff: integration/spark2.2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ScalarSubquery} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.CarbonLateDecodeRule +import org.apache.spark.sql.parser.CarbonSparkSqlParser + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( +externalCatalog: HiveExternalCatalog, +globalTempViewManager: GlobalTempViewManager, +functionRegistry: FunctionRegistry, +sparkSession: SparkSession, +conf: SQLConf, +hadoopConf: Configuration, +parser: ParserInterface, +functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( +externalCatalog, +globalTempViewManager, +new HiveMetastoreCatalog(sparkSession), +functionRegistry, +conf, +hadoopConf, +parser, +functionResourceLoader + ) { + + lazy val carbonEnv = { +val env = new CarbonEnv +env.init(sparkSession) +env + } + + + private def refreshRelationFromCache(identifier: TableIdentifier, + alias: Option[String], + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { +var isRefreshed = false +val storePath = CarbonEnv.getInstance(sparkSession).storePath +carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + +val tableMeta = carbonEnv.carbonMetastore + .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, +carbonDatasourceHadoopRelation.carbonTable.getFactTableName) +if (tableMeta.isEmpty || (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). +clearDataMap(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") +} +isRefreshed + }
[GitHub] carbondata pull request #1469: [WIP] Spark-2.2 Carbon Integration - Phase 1
GitHub user sounakr opened a pull request: https://github.com/apache/carbondata/pull/1469 [WIP] Spark-2.2 Carbon Integration - Phase 1 Spark-2.2 Carbon Integration. Phase 1 - Compilation ready for Spark-2.2. Phase 2 - Merge the changes of Spark-2.2 and Spark-2.1 to Spark-2 folder. - [ ] Any interfaces changed? - [ ] Any backward compatibility impacted? - [ ] Document update required? - [ ] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/sounakr/incubator-carbondata Carbon-Spark-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1469.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1469 commit d98e9a10bf2bf7e5164ba154230af40de2c1e796 Author: sounakrDate: 2017-11-06T07:21:17Z Spark-2.2 Carbon Integration ---