[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user asfgit closed the pull request at: https://github.com/apache/carbondata/pull/3027 ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244306298 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -262,13 +263,28 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( carbonTable: CarbonTable, tableInfo: TableInfo, addColumnSchema: ColumnSchema, - schemaEvolutionEntry: SchemaEvolutionEntry): Unit = { + schemaEvolutionEntry: SchemaEvolutionEntry, + oldCarbonColumn: CarbonColumn): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl -val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) -val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( - carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession) +// get the carbon column in schema order +val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema } +// get the schema ordinal of the column for which the datatype changed or column is renamed +var schemaOrdinal: Int = 0 +carbonColumns.foreach { carbonColumn => + if (carbonColumn.getColumnName.equalsIgnoreCase(oldCarbonColumn.getColName)) { +schemaOrdinal = carbonColumns.indexOf(carbonColumn) --- End diff -- Use filter function to achieve the required output ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244271907 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( carbonTable: CarbonTable, tableInfo: TableInfo, addColumnSchema: ColumnSchema, - schemaEvolutionEntry: SchemaEvolutionEntry): Unit = { + schemaEvolutionEntry: SchemaEvolutionEntry, + oldCarbonColumn: CarbonColumn): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl -val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) +// get the carbon column in schema order +val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema} --- End diff -- Move filter operation to collect ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244271865 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala --- @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + // carbon columns based on schema order + val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala +.collect { case carbonColumn => carbonColumn.getColumnSchema } +.filter(!_.isInvisible) --- End diff -- Move filter operation in collect operation by adding if clause in the case statement ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244272092 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala --- @@ -262,13 +263,26 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( carbonTable: CarbonTable, tableInfo: TableInfo, addColumnSchema: ColumnSchema, - schemaEvolutionEntry: SchemaEvolutionEntry): Unit = { + schemaEvolutionEntry: SchemaEvolutionEntry, + oldCarbonColumn: CarbonColumn): Unit = { val schemaConverter = new ThriftWrapperSchemaConverterImpl -val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) +// get the carbon column in schema order +val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .filter(!_.isInvisible).collect{case carbonColumn => carbonColumn.getColumnSchema} +// get the schema ordinal of the column for which the datatype changed or column is renamed +val schemaOrdinal = carbonColumns.collect { --- End diff -- Instead of collect try and use foreach ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244271732 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala --- @@ -93,11 +93,17 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + // carbon columns based on schema order + val carbonColumns = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala +.collect { case carbonColumn => carbonColumn.getColumnSchema } +.filter(!_.isInvisible) + // sort the new columns based on schema order + val sortedColsBasedActualSchemaOrder = newCols.sortBy(a => a.getSchemaOrdinal) val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), thriftTable, - Some(newCols))(sparkSession) + Some(carbonColumns ++ sortedColsBasedActualSchemaOrder))(sparkSession) --- End diff -- `AlterTableUtil.updateSchemaInfo` is not making use of columns passed so remove the method argument and use columns for changing the hive schema ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244270219 --- Diff: integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala --- @@ -105,47 +106,37 @@ class CarbonHiveSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } - def alterTableRename(oldTableIdentifier: TableIdentifier, - newTableIdentifier: TableIdentifier, - newTablePath: String): Unit = { -getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + - s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") -getClient().runSqlHive( - s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " + - s"SET SERDEPROPERTIES" + - s"('tableName'='${ newTableIdentifier.table }', " + - s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") - } - - override def alterTable(tableIdentifier: TableIdentifier, - schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { -getClient() - .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " + - s"SET TBLPROPERTIES(${ schemaParts })") - } - override def alterAddColumns(tableIdentifier: TableIdentifier, schemaParts: String, - cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) - : Unit = { + cols: Option[Seq[ColumnSchema]]): Unit = { alterTable(tableIdentifier, schemaParts, cols) +CarbonSessionUtil + .alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier, +cols, +schemaParts, +sparkSession) } override def alterDropColumns(tableIdentifier: TableIdentifier, --- End diff -- Unify `alterDropColumns` and `alterAddColumns` into one method...keep interface methods same but move the common code to 1 method and call it from the interface methods ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
Github user manishgupta88 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/3027#discussion_r244270598 --- Diff: integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionUtil.scala --- @@ -93,4 +98,34 @@ object CarbonSessionUtil { ) } + /** + * This method alter the table for datatype change or column rename operation, and update the + * external catalog directly + * + * @param tableIdentifier tableIdentifier for table + * @param colsall the column of table, which are updated with datatype change of + *new column name + * @param schemaParts schemaParts + * @param sparkSessionsparkSession + */ + def alterExternalCatalogForTableWithUpdatedSchema(tableIdentifier: TableIdentifier, + cols: Option[Seq[ColumnSchema]], + schemaParts: String, + sparkSession: SparkSession): Unit = { +val carbonTable = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) +val colArray: scala.collection.mutable.ArrayBuffer[StructField] = ArrayBuffer() +cols.get.foreach(column => + if (!column.isInvisible) { +colArray += StructField(column.getColumnName, + SparkTypeConverter +.convertCarbonToSparkDataType(column, + carbonTable)) + } +) +sparkSession.sessionState.catalog.externalCatalog + .alterTableDataSchema(tableIdentifier.database.get, --- End diff -- add a comment for the usage of API `alterTableDataSchema` to explain its purpose ---
[GitHub] carbondata pull request #3027: [CARBONDATA-3202]update the schema to session...
GitHub user akashrn5 opened a pull request: https://github.com/apache/carbondata/pull/3027 [CARBONDATA-3202]update the schema to session catalog after add column, drop column and column rename ### Why this PR? **Problem:**For alter table rename, once we change the table name in carbon, we fire alter table rename DDL using hive client. But for add, drop and column rename Spark does not support there features, but hive supports. so after rename, or add or drop column, the new updated schema is not updated in catalog. **Solution:**We can directly call the spark API **alterTableDataSchema** by passing the updated schema, which in turn updates the shema in sessioncatalog Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [x] Any interfaces changed? YES - [x] Any backward compatibility impacted? NA - [x] Document update required? NA - [x] Testing done tested in three node cluster for various spark versions Please provide details on - Whether new unit test cases have been added or why no new tests are required? - How it is tested? Please attach test report. - Is it a performance related change? Please attach the performance test report. - Any additional information to help reviewers in testing this change. - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. NA You can merge this pull request into a Git repository by running: $ git pull https://github.com/akashrn5/incubator-carbondata addcolumn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/3027.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3027 commit ee78136e55b309a4521914fdee57adf59cda8531 Author: akashrn5 Date: 2018-12-27T06:01:44Z update the schema to session catalog after add column, drop column and column rename ---