[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

2020-08-28 Thread GitBox


akashrn5 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r478884183



##
File path: docs/index/secondary-index-guide.md
##
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is 
some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the main table

Review comment:
   ```suggestion
   Reindex on all the secondary Indexes of the main table
   ```

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: 
TableIdentifier,
+  dbName: String,
+  segments: Option[List[String]]) extends 
DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+if (dbName == null) {
+  // table level and index level
+  val databaseName = if (tableNameOp.database.isEmpty) {
+SparkSession.getActiveSession.get.catalog.currentDatabase
+  } else {
+tableNameOp.database.get.toString
+  }
+  triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, 
indexname, segments)
+} else {
+  // for all tables in the db
+sparkSession.sessionState.catalog.listTables(dbName).foreach {
+  tableIdent =>
+triggerRepair(tableIdent.table, dbName, indexname.isEmpty, 
indexname, segments)
+}
+}
+Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: 
Boolean,
+indexName: Option[String], segments: 
Option[List[String]]): Unit = {
+val sparkSession = SparkSession.getActiveSession.get
+// when Si creation and load to main table are parallel, get the 
carbonTable from the
+// metastore which will have the latest index Info
+val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+val carbonTable = metaStore
+  .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+  .asInstanceOf[CarbonRelation].carbonTable
+
+val carbonLoadModel = new CarbonLoadModel
+carbonLoadModel.setDatabaseName(databaseName)
+carbonLoadModel.setTableName(tableNameOp)
+carbonLoadModel.setTablePath(carbonTable.getTablePath)
+val tableStatusFilePath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager

Review comment:
   you havent taken the lock here,please check

##
File path: docs/index/secondary-index-guide.md
##
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is 
some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes on the 

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

2020-08-20 Thread GitBox


akashrn5 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r474030501



##
File path: 
index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing create index table
+ */
+class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+sql("drop table if exists maintable")
+sql("drop table if exists indextable1")
+sql("drop table if exists indextable2")
+  }
+
+  test("reindex command after deleting segments from SI table") {
+sql("drop table if exists maintable")
+sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as 
carbondata")
+sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
+sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+assert(preDeleteSegments!=postDeleteSegments)
+sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
+val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+assert(preDeleteSegments == postRepairSegments)

Review comment:
   you should also consider, adding a query and check if its hitting SI 
after reindex. Please check for all test cases

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: 
TableIdentifier,
+  dbName: String,
+  segments: Option[List[String]]) extends 
DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+if (dbName == null) {
+  // table level and index level
+  val databaseName = if (tableNameOp.database.isEmpty) {
+SparkSession.getActiveSession.get.catalog.currentDatabase
+  } else {
+