[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table
akashrn5 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r478884183 ## File path: docs/index/secondary-index-guide.md ## @@ -188,4 +188,25 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name - ``` \ No newline at end of file + ``` + +### Reindex Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments with main table. + +Syntax + +Reindex on all the secondary Indexes on the main table Review comment: ```suggestion Reindex on all the secondary Indexes of the main table ``` ## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { +if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { +SparkSession.getActiveSession.get.catalog.currentDatabase + } else { +tableNameOp.database.get.toString + } + triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments) +} else { + // for all tables in the db +sparkSession.sessionState.catalog.listTables(dbName).foreach { + tableIdent => +triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments) +} +} +Seq.empty + } + + def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean, +indexName: Option[String], segments: Option[List[String]]): Unit = { +val sparkSession = SparkSession.getActiveSession.get +// when Si creation and load to main table are parallel, get the carbonTable from the +// metastore which will have the latest index Info +val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore +val carbonTable = metaStore + .lookupRelation(Some(databaseName), tableNameOp)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + +val carbonLoadModel = new CarbonLoadModel +carbonLoadModel.setDatabaseName(databaseName) +carbonLoadModel.setTableName(tableNameOp) +carbonLoadModel.setTablePath(carbonTable.getTablePath) +val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) +carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager Review comment: you havent taken the lock here,please check ## File path: docs/index/secondary-index-guide.md ## @@ -188,4 +188,25 @@ where we have old stores. Syntax ``` REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name - ``` \ No newline at end of file + ``` + +### Reindex Command +This command is used to reload segments in the SI table in case when there is some mismatch in the number +of segments with main table. + +Syntax + +Reindex on all the secondary Indexes on the
[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table
akashrn5 commented on a change in pull request #3873: URL: https://github.com/apache/carbondata/pull/3873#discussion_r474030501 ## File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala ## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.secondaryindex + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.test.util.QueryTest + +/** + * test cases for testing create index table + */ +class TestIndexRepair extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { +sql("drop table if exists maintable") +sql("drop table if exists indextable1") +sql("drop table if exists indextable2") + } + + test("reindex command after deleting segments from SI table") { +sql("drop table if exists maintable") +sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata") +sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'") +sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") +sql("INSERT INTO maintable SELECT 1,'string1', 'string2'") +val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() +sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)") +sql("CLEAN FILES FOR TABLE INDEXTABLE1") +val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() +assert(preDeleteSegments!=postDeleteSegments) +sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE") +val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count() +assert(preDeleteSegments == postRepairSegments) Review comment: you should also consider, adding a query and check if its hitting SI after reindex. Please check for all test cases ## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.index + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.DataCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.index.CarbonIndexUtil + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.index.IndexType +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} + +/** + * Show indexes on the table + */ +case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier, + dbName: String, + segments: Option[List[String]]) extends DataCommand{ + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + + def processData(sparkSession: SparkSession): Seq[Row] = { +if (dbName == null) { + // table level and index level + val databaseName = if (tableNameOp.database.isEmpty) { +SparkSession.getActiveSession.get.catalog.currentDatabase + } else { +